From 84553d90ed16f9d89c40e777a6f6d8e76acccf67 Mon Sep 17 00:00:00 2001 From: Trent Mohay <37158202+rain-on@users.noreply.github.com> Date: Mon, 25 Feb 2019 21:35:03 +1100 Subject: [PATCH 1/8] Remove start functionality from IbftController and IbftBlockHeightManager (#952) --- .../ibft/tests/FutureHeightTest.java | 6 -- .../consensus/ibft/tests/FutureRoundTest.java | 6 -- .../consensus/ibft/tests/GossipTest.java | 1 - .../ibft/tests/LocalNodeIsProposerTest.java | 3 +- .../ibft/tests/LocalNodeNotProposerTest.java | 2 - .../tests/ReceivedFutureProposalTest.java | 6 -- .../consensus/ibft/tests/RoundChangeTest.java | 6 -- .../ibft/tests/SpuriousBehaviourTest.java | 1 - .../ibft/statemachine/BlockHeightManager.java | 2 - .../statemachine/IbftBlockHeightManager.java | 9 +-- .../ibft/statemachine/IbftController.java | 5 -- .../ibft/statemachine/IbftRound.java | 6 +- .../ibft/statemachine/IbftRoundFactory.java | 3 +- .../statemachine/NoOpBlockHeightManager.java | 3 - .../IbftBlockHeightManagerTest.java | 15 ++--- .../ibft/statemachine/IbftControllerTest.java | 63 ++++++++---------- .../ibft/statemachine/IbftRoundTest.java | 52 +++++++++++---- .../eth/sync/BlockPropagationManager.java | 64 +++++++++++-------- .../controller/IbftPantheonController.java | 1 - 19 files changed, 118 insertions(+), 136 deletions(-) diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/FutureHeightTest.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/FutureHeightTest.java index d02d30f823..4561459ce2 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/FutureHeightTest.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/FutureHeightTest.java @@ -31,7 +31,6 @@ import java.time.Instant; import java.time.ZoneId; -import org.junit.Before; import org.junit.Test; public class FutureHeightTest { @@ -57,11 +56,6 @@ public class FutureHeightTest { private final MessageFactory localNodeMessageFactory = context.getLocalNodeMessageFactory(); - @Before - public void setup() { - context.getController().start(); - } - @Test public void messagesForFutureHeightAreBufferedUntilChainHeightCatchesUp() { final Block currentHeightBlock = context.createBlockForProposalFromChainHead(0, 30); diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/FutureRoundTest.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/FutureRoundTest.java index bab80ec658..84ecf6aba9 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/FutureRoundTest.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/FutureRoundTest.java @@ -30,7 +30,6 @@ import java.time.Instant; import java.time.ZoneId; -import org.junit.Before; import org.junit.Test; public class FutureRoundTest { @@ -57,11 +56,6 @@ public class FutureRoundTest { private final MessageFactory localNodeMessageFactory = context.getLocalNodeMessageFactory(); - @Before - public void setup() { - context.getController().start(); - } - @Test public void messagesForFutureRoundAreNotActionedUntilRoundIsActive() { final Block futureBlock = diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/GossipTest.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/GossipTest.java index fd3dff3149..3dc94e7c4a 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/GossipTest.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/GossipTest.java @@ -64,7 +64,6 @@ public class GossipTest { @Before public void setup() { - context.getController().start(); block = context.createBlockForProposalFromChainHead(roundId.getRoundNumber(), 30); sender = peers.getProposer(); msgFactory = sender.getMessageFactory(); diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/LocalNodeIsProposerTest.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/LocalNodeIsProposerTest.java index 24bb0d9014..ef2677c8de 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/LocalNodeIsProposerTest.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/LocalNodeIsProposerTest.java @@ -72,8 +72,7 @@ public void setup() { createSignedCommitPayload( roundId, expectedProposedBlock, context.getLocalNodeParams().getNodeKeyPair())); - // Start the Controller, and trigger "block timer" to send proposal. - context.getController().start(); + // Trigger "block timer" to send proposal. context.getController().handleBlockTimerExpiry(new BlockTimerExpiry(roundId)); } diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/LocalNodeNotProposerTest.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/LocalNodeNotProposerTest.java index d905ee3e69..668a643ddc 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/LocalNodeNotProposerTest.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/LocalNodeNotProposerTest.java @@ -56,8 +56,6 @@ public void setup() { new Commit( createSignedCommitPayload( roundId, blockToPropose, context.getLocalNodeParams().getNodeKeyPair())); - - context.getController().start(); } @Test diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/ReceivedFutureProposalTest.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/ReceivedFutureProposalTest.java index 2b43599e1e..1ac1f207bc 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/ReceivedFutureProposalTest.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/ReceivedFutureProposalTest.java @@ -31,7 +31,6 @@ import java.util.List; -import org.junit.Before; import org.junit.Test; /** @@ -53,11 +52,6 @@ public class ReceivedFutureProposalTest { private final MessageFactory localNodeMessageFactory = context.getLocalNodeMessageFactory(); - @Before - public void setup() { - context.getController().start(); - } - @Test public void proposalWithEmptyPrepareCertificatesOfferNewBlock() { final ConsensusRoundIdentifier nextRoundId = new ConsensusRoundIdentifier(1, 1); diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/RoundChangeTest.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/RoundChangeTest.java index 65558ba82c..a02d1600fa 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/RoundChangeTest.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/RoundChangeTest.java @@ -40,7 +40,6 @@ import java.util.Optional; import com.google.common.collect.Lists; -import org.junit.Before; import org.junit.Test; public class RoundChangeTest { @@ -65,11 +64,6 @@ public class RoundChangeTest { private final Block blockToPropose = context.createBlockForProposalFromChainHead(0, 15); - @Before - public void setup() { - context.getController().start(); - } - @Test public void onRoundChangeTimerExpiryEventRoundChangeMessageIsSent() { diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/SpuriousBehaviourTest.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/SpuriousBehaviourTest.java index d70743d8a3..3abb6d1b6a 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/SpuriousBehaviourTest.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/tests/SpuriousBehaviourTest.java @@ -67,7 +67,6 @@ public class SpuriousBehaviourTest { @Before public void setup() { - context.getController().start(); expectedPrepare = context.getLocalNodeMessageFactory().createPrepare(roundId, proposedBlock.getHash()); diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/BlockHeightManager.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/BlockHeightManager.java index ff0d78343e..2740033e55 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/BlockHeightManager.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/BlockHeightManager.java @@ -22,8 +22,6 @@ public interface BlockHeightManager { - void start(); - void handleBlockTimerExpiry(ConsensusRoundIdentifier roundIdentifier); void roundExpired(RoundExpiry expire); diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManager.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManager.java index aec650b751..0f753d1bf9 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManager.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManager.java @@ -18,7 +18,6 @@ import tech.pegasys.pantheon.consensus.ibft.BlockTimer; import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; -import tech.pegasys.pantheon.consensus.ibft.RoundTimer; import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry; import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Commit; import tech.pegasys.pantheon.consensus.ibft.messagewrappers.IbftMessage; @@ -58,7 +57,6 @@ public class IbftBlockHeightManager implements BlockHeightManager { private final IbftRoundFactory roundFactory; private final RoundChangeManager roundChangeManager; private final BlockHeader parentHeader; - private final RoundTimer roundTimer; private final BlockTimer blockTimer; private final IbftMessageTransmitter transmitter; private final MessageFactory messageFactory; @@ -81,7 +79,6 @@ public IbftBlockHeightManager( final MessageValidatorFactory messageValidatorFactory) { this.parentHeader = parentHeader; this.roundFactory = ibftRoundFactory; - this.roundTimer = finalState.getRoundTimer(); this.blockTimer = finalState.getBlockTimer(); this.transmitter = finalState.getTransmitter(); this.messageFactory = finalState.getMessageFactory(); @@ -99,11 +96,8 @@ public IbftBlockHeightManager( roundIdentifier, finalState.getQuorum(), messageValidatorFactory.createMessageValidator(roundIdentifier, parentHeader)); - } - @Override - public void start() { - startNewRound(0); + currentRound = roundFactory.createNewRound(parentHeader, 0); if (finalState.isLocalNodeProposerForRound(currentRound.getRoundIdentifier())) { blockTimer.startTimer(currentRound.getRoundIdentifier(), parentHeader); } @@ -247,7 +241,6 @@ private void startNewRound(final int roundNumber) { } // discard roundChange messages from the current and previous rounds roundChangeManager.discardRoundsPriorTo(currentRound.getRoundIdentifier()); - roundTimer.startTimer(currentRound.getRoundIdentifier()); } @Override diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java index 556a3d3739..bcc3b2f9d9 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java @@ -40,7 +40,6 @@ public class IbftController { private static final Logger LOG = LogManager.getLogger(); - private final Blockchain blockchain; private final IbftFinalState ibftFinalState; private final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory; private final FutureMessageBuffer futureMessageBuffer; @@ -57,16 +56,13 @@ public IbftController( final MessageTracker duplicateMessageTracker, final FutureMessageBuffer futureMessageBuffer, final SynchronizerUpdater sychronizerUpdater) { - this.blockchain = blockchain; this.ibftFinalState = ibftFinalState; this.ibftBlockHeightManagerFactory = ibftBlockHeightManagerFactory; this.futureMessageBuffer = futureMessageBuffer; this.gossiper = gossiper; this.duplicateMessageTracker = duplicateMessageTracker; this.sychronizerUpdater = sychronizerUpdater; - } - public void start() { startNewHeightManager(blockchain.getChainHeadHeader()); } @@ -183,7 +179,6 @@ public void handleRoundExpiry(final RoundExpiry roundExpiry) { private void startNewHeightManager(final BlockHeader parentHeader) { currentHeightManager = ibftBlockHeightManagerFactory.create(parentHeader); - currentHeightManager.start(); final long newChainHeight = currentHeightManager.getChainHeight(); futureMessageBuffer.retrieveMessagesForHeight(newChainHeight).forEach(this::handleMessage); } diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRound.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRound.java index 7fbc158959..e55a92391d 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRound.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRound.java @@ -18,6 +18,7 @@ import tech.pegasys.pantheon.consensus.ibft.IbftContext; import tech.pegasys.pantheon.consensus.ibft.IbftExtraData; import tech.pegasys.pantheon.consensus.ibft.IbftHelpers; +import tech.pegasys.pantheon.consensus.ibft.RoundTimer; import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreator; import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Commit; import tech.pegasys.pantheon.consensus.ibft.messagewrappers.Prepare; @@ -63,7 +64,8 @@ public IbftRound( final Subscribers observers, final KeyPair nodeKeys, final MessageFactory messageFactory, - final IbftMessageTransmitter transmitter) { + final IbftMessageTransmitter transmitter, + final RoundTimer roundTimer) { this.roundState = roundState; this.blockCreator = blockCreator; this.protocolContext = protocolContext; @@ -72,6 +74,8 @@ public IbftRound( this.nodeKeys = nodeKeys; this.messageFactory = messageFactory; this.transmitter = transmitter; + + roundTimer.startTimer(getRoundIdentifier()); } public ConsensusRoundIdentifier getRoundIdentifier() { diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRoundFactory.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRoundFactory.java index d6dad5fdef..1ef30d8216 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRoundFactory.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRoundFactory.java @@ -73,6 +73,7 @@ public IbftRound createNewRoundWithState( minedBlockObservers, finalState.getNodeKeys(), finalState.getMessageFactory(), - finalState.getTransmitter()); + finalState.getTransmitter(), + finalState.getRoundTimer()); } } diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/NoOpBlockHeightManager.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/NoOpBlockHeightManager.java index 5714079d47..1df97e9dd4 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/NoOpBlockHeightManager.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/NoOpBlockHeightManager.java @@ -28,9 +28,6 @@ public NoOpBlockHeightManager(final BlockHeader parentHeader) { this.parentHeader = parentHeader; } - @Override - public void start() {} - @Override public void handleBlockTimerExpiry(final ConsensusRoundIdentifier roundIdentifier) {} diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java index 1c38622239..8c16643465 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java @@ -131,7 +131,6 @@ public void setup() { when(messageValidator.validatePrepare(any())).thenReturn(true); when(finalState.getTransmitter()).thenReturn(messageTransmitter); when(finalState.getBlockTimer()).thenReturn(blockTimer); - when(finalState.getRoundTimer()).thenReturn(roundTimer); when(finalState.getQuorum()).thenReturn(3); when(finalState.getMessageFactory()).thenReturn(messageFactory); when(blockCreator.createBlock(anyLong())).thenReturn(createdBlock); @@ -159,7 +158,8 @@ public void setup() { new Subscribers<>(), localNodeKeys, messageFactory, - messageTransmitter); + messageTransmitter, + roundTimer); }); when(roundFactory.createNewRoundWithState(any(), any())) @@ -174,7 +174,8 @@ public void setup() { new Subscribers<>(), localNodeKeys, messageFactory, - messageTransmitter); + messageTransmitter, + roundTimer); }); } @@ -190,7 +191,6 @@ public void startsABlockTimerOnStartIfLocalNodeIsTheProoserForRound() { roundFactory, clock, messageValidatorFactory); - manager.start(); verify(blockTimer, times(1)).startTimer(any(), any()); } @@ -205,7 +205,6 @@ public void onBlockTimerExpiryProposalMessageIsTransmitted() { roundFactory, clock, messageValidatorFactory); - manager.start(); manager.handleBlockTimerExpiry(roundIdentifier); verify(messageTransmitter, times(1)).multicastProposal(eq(roundIdentifier), any(), any()); @@ -230,7 +229,6 @@ public void onRoundChangeReceptionRoundChangeManagerIsInvokedAndNewRoundStarted( roundFactory, clock, messageValidatorFactory); - manager.start(); verify(roundFactory).createNewRound(any(), eq(0)); manager.handleRoundChangePayload(roundChange); @@ -250,7 +248,6 @@ public void onRoundTimerExpiryANewRoundIsCreatedWithAnIncrementedRoundNumber() { roundFactory, clock, messageValidatorFactory); - manager.start(); verify(roundFactory).createNewRound(any(), eq(0)); manager.roundExpired(new RoundExpiry(roundIdentifier)); @@ -277,7 +274,6 @@ public void whenSufficientRoundChangesAreReceivedAProposalMessageIsTransmitted() roundFactory, clock, messageValidatorFactory); - manager.start(); reset(messageTransmitter); manager.handleRoundChangePayload(roundChange); @@ -298,7 +294,6 @@ public void messagesForFutureRoundsAreBufferedAndUsedToPreloadNewRoundWhenItIsSt roundFactory, clock, messageValidatorFactory); - manager.start(); final Prepare prepare = validatorMessageFactory @@ -339,7 +334,6 @@ public void preparedCertificateIncludedInRoundChangeMessageOnRoundTimeoutExpired roundFactory, clock, messageValidatorFactory); - manager.start(); manager.handleBlockTimerExpiry(roundIdentifier); // Trigger a Proposal creation. final Prepare firstPrepare = @@ -388,7 +382,6 @@ public void illegalFutureRoundProposalDoesNotTriggerNewRound() { futureRoundIdentifier, createdBlock, Optional.of(new RoundChangeCertificate(Collections.emptyList()))); - manager.start(); reset(roundFactory); // Discard the existing createNewRound invocation. manager.handleProposalPayload(futureRoundProposal); diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java index 1c36026888..b259cf0555 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java @@ -96,15 +96,6 @@ public void setup() { when(blockChain.getChainHeadHeader()).thenReturn(chainHeadBlockHeader); when(blockHeightManagerFactory.create(any())).thenReturn(blockHeightManager); when(ibftFinalState.getValidators()).thenReturn(ImmutableList.of(validator)); - ibftController = - new IbftController( - blockChain, - ibftFinalState, - blockHeightManagerFactory, - ibftGossip, - messageTracker, - futureMessageBuffer, - mock(EthSynchronizerUpdater.class)); when(chainHeadBlockHeader.getNumber()).thenReturn(1L); when(chainHeadBlockHeader.getHash()).thenReturn(Hash.ZERO); @@ -117,9 +108,21 @@ public void setup() { when(messageTracker.hasSeenMessage(any())).thenReturn(false); } + private void constructIbftController() { + ibftController = + new IbftController( + blockChain, + ibftFinalState, + blockHeightManagerFactory, + ibftGossip, + messageTracker, + futureMessageBuffer, + mock(EthSynchronizerUpdater.class)); + } + @Test public void createsNewBlockHeightManagerWhenStarted() { - ibftController.start(); + constructIbftController(); verify(futureMessageBuffer, never()).addMessage(anyLong(), any()); verify(blockHeightManagerFactory).create(chainHeadBlockHeader); } @@ -137,12 +140,12 @@ public void startsNewBlockHeightManagerAndReplaysFutureMessages() { when(blockHeightManager.getChainHeight()).thenReturn(2L); when(futureMessageBuffer.retrieveMessagesForHeight(2L)).thenReturn(height2Msgs); - ibftController.start(); + constructIbftController(); + verify(futureMessageBuffer).retrieveMessagesForHeight(2L); verify(futureMessageBuffer, never()).retrieveMessagesForHeight(3L); verify(blockHeightManagerFactory).create(chainHeadBlockHeader); verify(blockHeightManager, atLeastOnce()).getChainHeight(); - verify(blockHeightManager).start(); verify(blockHeightManager, never()).handleProposalPayload(proposal); verify(blockHeightManager).handlePreparePayload(prepare); verify(ibftGossip).send(prepareMessage); @@ -165,13 +168,12 @@ public void createsNewBlockHeightManagerAndReplaysFutureMessagesOnNewChainHeadEv .thenReturn(emptyList()); when(blockHeightManager.getChainHeight()).thenReturn(2L); - ibftController.start(); + constructIbftController(); final NewChainHead newChainHead = new NewChainHead(nextBlock); ibftController.handleNewBlockEvent(newChainHead); verify(blockHeightManagerFactory).create(nextBlock); verify(blockHeightManager, atLeastOnce()).getChainHeight(); - verify(blockHeightManager, times(2)).start(); // once at beginning, and again on newChainHead. verify(futureMessageBuffer, times(2)).retrieveMessagesForHeight(2L); verify(blockHeightManager).handleProposalPayload(proposal); verify(ibftGossip).send(proposalMessage); @@ -185,28 +187,25 @@ public void createsNewBlockHeightManagerAndReplaysFutureMessagesOnNewChainHeadEv @Test public void newBlockForCurrentOrPreviousHeightTriggersNoChange() { - ibftController.start(); - + constructIbftController(); long chainHeadHeight = chainHeadBlockHeader.getNumber(); when(nextBlock.getNumber()).thenReturn(chainHeadHeight); when(nextBlock.getHash()).thenReturn(Hash.ZERO); final NewChainHead sameHeightBlock = new NewChainHead(nextBlock); ibftController.handleNewBlockEvent(sameHeightBlock); verify(blockHeightManagerFactory, times(1)).create(any()); // initial creation - verify(blockHeightManager, times(1)).start(); // the initial call at start of test. when(nextBlock.getNumber()).thenReturn(chainHeadHeight - 1); final NewChainHead priorBlock = new NewChainHead(nextBlock); ibftController.handleNewBlockEvent(priorBlock); verify(blockHeightManagerFactory, times(1)).create(any()); - verify(blockHeightManager, times(1)).start(); } @Test public void handlesRoundExpiry() { final RoundExpiry roundExpiry = new RoundExpiry(roundIdentifier); - ibftController.start(); + constructIbftController(); ibftController.handleRoundExpiry(roundExpiry); verify(blockHeightManager).roundExpired(roundExpiry); @@ -216,7 +215,7 @@ public void handlesRoundExpiry() { public void handlesBlockTimerExpiry() { final BlockTimerExpiry blockTimerExpiry = new BlockTimerExpiry(roundIdentifier); - ibftController.start(); + constructIbftController(); ibftController.handleBlockTimerExpiry(blockTimerExpiry); verify(blockHeightManager).handleBlockTimerExpiry(roundIdentifier); @@ -225,42 +224,39 @@ public void handlesBlockTimerExpiry() { @Test public void proposalForCurrentHeightIsPassedToBlockHeightManager() { setupProposal(roundIdentifier, validator); - ibftController.start(); + constructIbftController(); ibftController.handleMessageEvent(new IbftReceivedMessageEvent(proposalMessage)); verify(futureMessageBuffer, never()).addMessage(anyLong(), any()); verify(blockHeightManager).handleProposalPayload(proposal); verify(ibftGossip).send(proposalMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); - verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); } @Test public void prepareForCurrentHeightIsPassedToBlockHeightManager() { setupPrepare(roundIdentifier, validator); - ibftController.start(); + constructIbftController(); ibftController.handleMessageEvent(new IbftReceivedMessageEvent(prepareMessage)); verify(futureMessageBuffer, never()).addMessage(anyLong(), any()); verify(blockHeightManager).handlePreparePayload(prepare); verify(ibftGossip).send(prepareMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); - verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); } @Test public void commitForCurrentHeightIsPassedToBlockHeightManager() { setupCommit(roundIdentifier, validator); - ibftController.start(); + constructIbftController(); ibftController.handleMessageEvent(new IbftReceivedMessageEvent(commitMessage)); verify(futureMessageBuffer, never()).addMessage(anyLong(), any()); verify(blockHeightManager).handleCommitPayload(commit); verify(ibftGossip).send(commitMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); - verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); } @@ -268,14 +264,13 @@ public void commitForCurrentHeightIsPassedToBlockHeightManager() { public void roundChangeForCurrentHeightIsPassedToBlockHeightManager() { roundIdentifier = new ConsensusRoundIdentifier(0, 1); setupRoundChange(roundIdentifier, validator); - ibftController.start(); + constructIbftController(); ibftController.handleMessageEvent(new IbftReceivedMessageEvent(roundChangeMessage)); verify(futureMessageBuffer, never()).addMessage(anyLong(), any()); verify(blockHeightManager).handleRoundChangePayload(roundChange); verify(ibftGossip).send(roundChangeMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); - verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); } @@ -311,7 +306,7 @@ public void roundChangeForPastHeightIsDiscarded() { public void roundExpiryForPastHeightIsDiscarded() { final RoundExpiry roundExpiry = new RoundExpiry(roundIdentifier); when(blockHeightManager.getChainHeight()).thenReturn(1L); - ibftController.start(); + constructIbftController(); ibftController.handleRoundExpiry(roundExpiry); verify(futureMessageBuffer, never()).addMessage(anyLong(), any()); verify(blockHeightManager, never()).roundExpired(any()); @@ -321,7 +316,7 @@ public void roundExpiryForPastHeightIsDiscarded() { public void blockTimerForPastHeightIsDiscarded() { final BlockTimerExpiry blockTimerExpiry = new BlockTimerExpiry(roundIdentifier); when(blockHeightManager.getChainHeight()).thenReturn(1L); - ibftController.start(); + constructIbftController(); ibftController.handleBlockTimerExpiry(blockTimerExpiry); verify(futureMessageBuffer, never()).addMessage(anyLong(), any()); verify(blockHeightManager, never()).handleBlockTimerExpiry(any()); @@ -387,29 +382,27 @@ public void duplicatedMessagesAreNotProcessed() { public void uniqueMessagesAreAddedAsSeen() { when(messageTracker.hasSeenMessage(proposalMessageData)).thenReturn(false); setupProposal(roundIdentifier, validator); - ibftController.start(); + constructIbftController(); ibftController.handleMessageEvent(new IbftReceivedMessageEvent(proposalMessage)); verify(messageTracker).addSeenMessage(proposalMessageData); } private void verifyNotHandledAndNoFutureMsgs(final IbftReceivedMessageEvent msg) { - ibftController.start(); + constructIbftController(); ibftController.handleMessageEvent(msg); verify(futureMessageBuffer, never()).addMessage(anyLong(), any()); verify(blockHeightManager, atLeastOnce()).getChainHeight(); - verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); } private void verifyHasFutureMessages(final long msgHeight, final Message message) { - ibftController.start(); + constructIbftController(); ibftController.handleMessageEvent(new IbftReceivedMessageEvent(message)); verify(futureMessageBuffer).addMessage(msgHeight, message); verify(blockHeightManager, atLeastOnce()).getChainHeight(); - verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRoundTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRoundTest.java index e2c097fab8..18b5474d0a 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRoundTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftRoundTest.java @@ -28,6 +28,7 @@ import tech.pegasys.pantheon.consensus.ibft.IbftBlockHashing; import tech.pegasys.pantheon.consensus.ibft.IbftContext; import tech.pegasys.pantheon.consensus.ibft.IbftExtraData; +import tech.pegasys.pantheon.consensus.ibft.RoundTimer; import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreator; import tech.pegasys.pantheon.consensus.ibft.network.IbftMessageTransmitter; import tech.pegasys.pantheon.consensus.ibft.payload.MessageFactory; @@ -79,6 +80,7 @@ public class IbftRoundTest { @Mock private MinedBlockObserver minedBlockObserver; @Mock private IbftBlockCreator blockCreator; @Mock private MessageValidator messageValidator; + @Mock private RoundTimer roundTimer; @Captor private ArgumentCaptor> payloadArgCaptor; @Captor private ArgumentCaptor blockCaptor; @@ -115,6 +117,23 @@ public void setup() { subscribers.subscribe(minedBlockObserver); } + @Test + public void onConstructionRoundTimerIsStarted() { + final RoundState roundState = new RoundState(roundIdentifier, 3, messageValidator); + final IbftRound round = + new IbftRound( + roundState, + blockCreator, + protocolContext, + blockImporter, + subscribers, + localNodeKeys, + messageFactory, + transmitter, + roundTimer); + verify(roundTimer, times(1)).startTimer(roundIdentifier); + } + @Test public void onReceptionOfValidProposalSendsAPrepareToNetworkPeers() { final RoundState roundState = new RoundState(roundIdentifier, 3, messageValidator); @@ -127,7 +146,8 @@ public void onReceptionOfValidProposalSendsAPrepareToNetworkPeers() { subscribers, localNodeKeys, messageFactory, - transmitter); + transmitter, + roundTimer); round.handleProposalMessage( messageFactory.createProposal(roundIdentifier, proposedBlock, Optional.empty())); @@ -147,7 +167,8 @@ public void sendsAProposalWhenRequested() { subscribers, localNodeKeys, messageFactory, - transmitter); + transmitter, + roundTimer); round.createAndSendProposalMessage(15); verify(transmitter, times(1)) @@ -168,7 +189,8 @@ public void singleValidatorImportBlocksImmediatelyOnProposalCreation() { subscribers, localNodeKeys, messageFactory, - transmitter); + transmitter, + roundTimer); round.createAndSendProposalMessage(15); verify(transmitter, times(1)) .multicastProposal(roundIdentifier, proposedBlock, Optional.empty()); @@ -189,7 +211,8 @@ public void twoValidatorNetworkSendsPrepareOnProposalReceptionThenSendsCommitOnC subscribers, localNodeKeys, messageFactory, - transmitter); + transmitter, + roundTimer); final Hash commitSealHash = IbftBlockHashing.calculateDataHashForCommittedSeal( @@ -231,7 +254,8 @@ public void localNodeProposesToNetworkOfTwoValidatorsImportsOnReceptionOfCommitF subscribers, localNodeKeys, messageFactory, - transmitter); + transmitter, + roundTimer); final Hash commitSealHash = IbftBlockHashing.calculateDataHashForCommittedSeal( @@ -266,7 +290,8 @@ public void aProposalWithAnewBlockIsSentUponReceptionOfARoundChangeWithNoCertifi subscribers, localNodeKeys, messageFactory, - transmitter); + transmitter, + roundTimer); final RoundChangeCertificate roundChangeCertificate = new RoundChangeCertificate(emptyList()); @@ -288,7 +313,8 @@ public void aProposalMessageWithTheSameBlockIsSentUponReceptionOfARoundChangeWit subscribers, localNodeKeys, messageFactory, - transmitter); + transmitter, + roundTimer); final RoundChangeArtifacts roundChangeArtifacts = RoundChangeArtifacts.create( @@ -333,7 +359,8 @@ public void creatingNewBlockFromEmptyPreparedCertificateUpdatesInternalState() { subscribers, localNodeKeys, messageFactory, - transmitter); + transmitter, + roundTimer); final RoundChangeArtifacts roundChangeArtifacts = RoundChangeArtifacts.create( @@ -365,7 +392,8 @@ public void creatingNewBlockNotifiesBlockMiningObservers() { subscribers, localNodeKeys, messageFactory, - transmitter); + transmitter, + roundTimer); round.createAndSendProposalMessage(15); verify(minedBlockObserver).blockMined(any()); } @@ -384,7 +412,8 @@ public void blockIsOnlyImportedOnceWhenCommitsAreReceivedBeforeProposal() { subscribers, localNodeKeys, messageFactory, - transmitter); + transmitter, + roundTimer); round.handleCommitMessage( messageFactory.createCommit(roundIdentifier, proposedBlock.getHash(), remoteCommitSeal)); @@ -408,7 +437,8 @@ public void blockIsImportedOnlyOnceIfQuorumCommitsAreReceivedPriorToProposal() { subscribers, localNodeKeys, messageFactory, - transmitter); + transmitter, + roundTimer); round.handleCommitMessage( messageFactory.createCommit(roundIdentifier, proposedBlock.getHash(), remoteCommitSeal)); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 9dbd297cc9..45d4598c04 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -109,32 +109,6 @@ private void setupListeners() { .subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork); } - private void validateAndBroadcastBlock(final Block block) { - final ProtocolSpec protocolSpec = - protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); - final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); - final BlockHeader parent = - protocolContext - .getBlockchain() - .getBlockHeader(block.getHeader().getParentHash()) - .orElseThrow( - () -> - new IllegalArgumentException( - "Incapable of retrieving header from non-existent parent of " - + block.getHeader().getNumber() - + ".")); - if (blockHeaderValidator.validateHeader( - block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { - final UInt256 totalDifficulty = - protocolContext - .getBlockchain() - .getTotalDifficultyByHash(parent.getHash()) - .get() - .plus(block.getHeader().getDifficulty()); - blockBroadcaster.propagate(block, totalDifficulty); - } - } - private void onBlockAdded(final BlockAddedEvent blockAddedEvent, final Blockchain blockchain) { // Check to see if any of our pending blocks are now ready for import final Block newBlock = blockAddedEvent.getBlock(); @@ -263,6 +237,24 @@ private CompletableFuture processAnnouncedBlock( return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult())); } + private boolean validateHeaded(final Block block, final BlockHeader parent) { + final ProtocolSpec protocolSpec = + protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); + final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); + return blockHeaderValidator.validateHeader( + block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL); + } + + private void broadcastBlock(final Block block, final BlockHeader parent) { + final UInt256 totalDifficulty = + protocolContext + .getBlockchain() + .getTotalDifficultyByHash(parent.getHash()) + .get() + .plus(block.getHeader().getDifficulty()); + blockBroadcaster.propagate(block, totalDifficulty); + } + @VisibleForTesting CompletableFuture importOrSavePendingBlock(final Block block) { // Synchronize to avoid race condition where block import event fires after the @@ -292,12 +284,28 @@ CompletableFuture importOrSavePendingBlock(final Block block) { return CompletableFuture.completedFuture(block); } - ethContext.getScheduler().scheduleSyncWorkerTask(() -> validateAndBroadcastBlock(block)); + final BlockHeader parent = + protocolContext + .getBlockchain() + .getBlockHeader(block.getHeader().getParentHash()) + .orElseThrow( + () -> + new IllegalArgumentException( + "Incapable of retrieving header from non-existent parent of " + + block.getHeader().getNumber() + + ".")); + + if (!validateHeaded(block, parent)) { + throw new IllegalArgumentException("Invalid block: " + block.getHeader().getNumber()); + } + + ethContext.getScheduler().scheduleSyncWorkerTask(() -> broadcastBlock(block, parent)); // Import block final PersistBlockTask importTask = PersistBlockTask.create( - protocolSchedule, protocolContext, block, HeaderValidationMode.FULL, metricsSystem); + protocolSchedule, protocolContext, block, HeaderValidationMode.NONE, metricsSystem); + return ethContext .getScheduler() .scheduleSyncWorkerTask(importTask::run) diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java index 604f0feaef..67d629f868 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java @@ -265,7 +265,6 @@ public static PantheonController init( duplicateMessageTracker, futureMessageBuffer, new EthSynchronizerUpdater(ethContext.getEthPeers())); - ibftController.start(); final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController); final IbftProcessor ibftProcessor = new IbftProcessor(ibftEventQueue, eventMultiplexer); From 9924c3378aafd705c62616120a97c88750058e29 Mon Sep 17 00:00:00 2001 From: "S. Matthew English" Date: Mon, 25 Feb 2019 14:00:12 -0500 Subject: [PATCH 2/8] update --- .../ethereum/eth/sync/BlockPropagationManager.java | 12 +++++++----- .../eth/sync/BlockPropagationManagerTest.java | 1 + 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 45d4598c04..67f7c7f5c0 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -237,7 +237,7 @@ private CompletableFuture processAnnouncedBlock( return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult())); } - private boolean validateHeaded(final Block block, final BlockHeader parent) { + private boolean validateHeader(final Block block, final BlockHeader parent) { final ProtocolSpec protocolSpec = protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); @@ -295,16 +295,18 @@ CompletableFuture importOrSavePendingBlock(final Block block) { + block.getHeader().getNumber() + ".")); - if (!validateHeaded(block, parent)) { + AtomicBoolean isValid = new AtomicBoolean(false); + ethContext.getScheduler().scheduleSyncWorkerTask(() -> { + isValid.getAndSet(validateHeader(block, parent)); + }); + if (!isValid.get()) { throw new IllegalArgumentException("Invalid block: " + block.getHeader().getNumber()); } ethContext.getScheduler().scheduleSyncWorkerTask(() -> broadcastBlock(block, parent)); // Import block - final PersistBlockTask importTask = - PersistBlockTask.create( - protocolSchedule, protocolContext, block, HeaderValidationMode.NONE, metricsSystem); + final PersistBlockTask importTask = PersistBlockTask.create(protocolSchedule, protocolContext, block, HeaderValidationMode.NONE, metricsSystem); return ethContext .getScheduler() diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java index 1a6c66bed3..1623f79836 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java @@ -543,6 +543,7 @@ public void shouldNotImportBlocksThatAreAlreadyBeingImported() { final EthScheduler ethScheduler = mock(EthScheduler.class); when(ethScheduler.scheduleSyncWorkerTask(any(Supplier.class))) .thenReturn(new CompletableFuture<>()); + final EthContext ethContext = new EthContext("eth", new EthPeers("eth"), new EthMessages(), ethScheduler); final BlockPropagationManager blockPropagationManager = From 06cceab348e56e6d736d1d08e8629f78af3ca345 Mon Sep 17 00:00:00 2001 From: "S. Matthew English" Date: Mon, 25 Feb 2019 14:08:43 -0500 Subject: [PATCH 3/8] update II --- .../ethereum/eth/sync/BlockPropagationManager.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 67f7c7f5c0..12b3bfceb8 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -237,12 +237,10 @@ private CompletableFuture processAnnouncedBlock( return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult())); } - private boolean validateHeader(final Block block, final BlockHeader parent) { - final ProtocolSpec protocolSpec = - protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); + private CompletableFuture validateHeader(final Block block, final BlockHeader parent) { + final ProtocolSpec protocolSpec = protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); - return blockHeaderValidator.validateHeader( - block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL); + return ethContext.getScheduler().scheduleComputationTask(() -> blockHeaderValidator.validateHeader(block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)); } private void broadcastBlock(final Block block, final BlockHeader parent) { @@ -255,6 +253,7 @@ private void broadcastBlock(final Block block, final BlockHeader parent) { blockBroadcaster.propagate(block, totalDifficulty); } + @SuppressWarnings("LocalCanBeFinal") @VisibleForTesting CompletableFuture importOrSavePendingBlock(final Block block) { // Synchronize to avoid race condition where block import event fires after the @@ -295,10 +294,7 @@ CompletableFuture importOrSavePendingBlock(final Block block) { + block.getHeader().getNumber() + ".")); - AtomicBoolean isValid = new AtomicBoolean(false); - ethContext.getScheduler().scheduleSyncWorkerTask(() -> { - isValid.getAndSet(validateHeader(block, parent)); - }); + if (!isValid.get()) { throw new IllegalArgumentException("Invalid block: " + block.getHeader().getNumber()); } From 4b3c55425bcb2b62db99caec45cdbcfb688ec5ba Mon Sep 17 00:00:00 2001 From: "S. Matthew English" Date: Mon, 25 Feb 2019 14:28:43 -0500 Subject: [PATCH 4/8] update x --- .../eth/sync/BlockPropagationManager.java | 25 ++++++++++++++----- .../eth/sync/BlockPropagationManagerTest.java | 2 +- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 12b3bfceb8..242ff32929 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -237,10 +237,17 @@ private CompletableFuture processAnnouncedBlock( return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult())); } - private CompletableFuture validateHeader(final Block block, final BlockHeader parent) { - final ProtocolSpec protocolSpec = protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); + Boolean validateHeader(final Block block, final BlockHeader parent) throws Exception { + final ProtocolSpec protocolSpec = + protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); - return ethContext.getScheduler().scheduleComputationTask(() -> blockHeaderValidator.validateHeader(block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)); + return ethContext + .getScheduler() + .scheduleComputationTask( + () -> + blockHeaderValidator.validateHeader( + block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) + .get(); } private void broadcastBlock(final Block block, final BlockHeader parent) { @@ -294,15 +301,21 @@ CompletableFuture importOrSavePendingBlock(final Block block) { + block.getHeader().getNumber() + ".")); - - if (!isValid.get()) { + boolean validation = false; + try { + validation = validateHeader(block, parent); + } catch (Exception ignored) { + } + if (!validation) { throw new IllegalArgumentException("Invalid block: " + block.getHeader().getNumber()); } ethContext.getScheduler().scheduleSyncWorkerTask(() -> broadcastBlock(block, parent)); // Import block - final PersistBlockTask importTask = PersistBlockTask.create(protocolSchedule, protocolContext, block, HeaderValidationMode.NONE, metricsSystem); + final PersistBlockTask importTask = + PersistBlockTask.create( + protocolSchedule, protocolContext, block, HeaderValidationMode.NONE, metricsSystem); return ethContext .getScheduler() diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java index 1623f79836..ac97870826 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java @@ -75,7 +75,7 @@ public static void setupSuite() { } @Before - public void setup() { + public void setup() throws Exception { blockchainUtil = BlockchainSetupUtil.forTesting(); blockchain = spy(blockchainUtil.getBlockchain()); protocolSchedule = blockchainUtil.getProtocolSchedule(); From bba49b61651ffbb61e4ead70239f49cb0b360d86 Mon Sep 17 00:00:00 2001 From: "S. Matthew English" Date: Mon, 25 Feb 2019 16:17:20 -0500 Subject: [PATCH 5/8] threading --- .../eth/sync/BlockPropagationManager.java | 80 ++++++++----------- 1 file changed, 33 insertions(+), 47 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 242ff32929..43eabeefb6 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -237,19 +237,6 @@ private CompletableFuture processAnnouncedBlock( return getBlockTask.run().thenCompose((r) -> importOrSavePendingBlock(r.getResult())); } - Boolean validateHeader(final Block block, final BlockHeader parent) throws Exception { - final ProtocolSpec protocolSpec = - protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); - final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); - return ethContext - .getScheduler() - .scheduleComputationTask( - () -> - blockHeaderValidator.validateHeader( - block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) - .get(); - } - private void broadcastBlock(final Block block, final BlockHeader parent) { final UInt256 totalDifficulty = protocolContext @@ -260,7 +247,6 @@ private void broadcastBlock(final Block block, final BlockHeader parent) { blockBroadcaster.propagate(block, totalDifficulty); } - @SuppressWarnings("LocalCanBeFinal") @VisibleForTesting CompletableFuture importOrSavePendingBlock(final Block block) { // Synchronize to avoid race condition where block import event fires after the @@ -301,41 +287,41 @@ CompletableFuture importOrSavePendingBlock(final Block block) { + block.getHeader().getNumber() + ".")); - boolean validation = false; - try { - validation = validateHeader(block, parent); - } catch (Exception ignored) { - } - if (!validation) { - throw new IllegalArgumentException("Invalid block: " + block.getHeader().getNumber()); - } - - ethContext.getScheduler().scheduleSyncWorkerTask(() -> broadcastBlock(block, parent)); + final ProtocolSpec protocolSpec = + protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); + final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); // Import block - final PersistBlockTask importTask = - PersistBlockTask.create( - protocolSchedule, protocolContext, block, HeaderValidationMode.NONE, metricsSystem); - - return ethContext - .getScheduler() - .scheduleSyncWorkerTask(importTask::run) - .whenComplete( - (r, t) -> { - importingBlocks.remove(block.getHash()); - if (t != null) { - LOG.warn( - "Failed to import announced block {} ({}).", - block.getHeader().getNumber(), - block.getHash()); - } else { - final double timeInMs = importTask.getTaskTimeInSec() * 1000; - LOG.info( - String.format( - "Successfully imported announced block %d (%s) in %01.3fms.", - block.getHeader().getNumber(), block.getHash(), timeInMs)); - } - }); + final PersistBlockTask importTask = PersistBlockTask.create(protocolSchedule, protocolContext, block, HeaderValidationMode.NONE, metricsSystem); + + ethContext.getScheduler() + .scheduleSyncWorkerTask( + () -> { + if(blockHeaderValidator.validateHeader(block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { + ethContext.getScheduler().scheduleSyncWorkerTask(() -> broadcastBlock(block, parent)); + ethContext.getScheduler().scheduleSyncWorkerTask(importTask::run).whenComplete( + (result, throwable) -> { + importingBlocks.remove(block.getHash()); + if (throwable != null) { + LOG.warn( + "Failed to import announced block {} ({}).", + block.getHeader().getNumber(), + block.getHash()); + } else { + final double timeInMs = importTask.getTaskTimeInSec() * 1000; + LOG.info( + String.format( + "Successfully imported announced block %d (%s) in %01.3fms.", + block.getHeader().getNumber(), block.getHash(), timeInMs)); + } + }); + } else { + importingBlocks.remove(block.getHash()); + LOG.warn("Failed to import announced block {} ({}).", block.getHeader().getNumber(), block.getHash()); + } + }); + + return null; } // Only import blocks within a certain range of our head and sync target From d6ed372469feef99c2d937f4795aac062400485f Mon Sep 17 00:00:00 2001 From: "S. Matthew English" Date: Mon, 25 Feb 2019 16:39:10 -0500 Subject: [PATCH 6/8] return value --- .../eth/sync/BlockPropagationManager.java | 77 +++++++++++-------- .../eth/sync/BlockPropagationManagerTest.java | 3 +- 2 files changed, 45 insertions(+), 35 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 43eabeefb6..57cbce4618 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -287,41 +287,52 @@ CompletableFuture importOrSavePendingBlock(final Block block) { + block.getHeader().getNumber() + ".")); - final ProtocolSpec protocolSpec = - protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); - final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); + final ProtocolSpec protocolSpec = + protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); + final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); // Import block - final PersistBlockTask importTask = PersistBlockTask.create(protocolSchedule, protocolContext, block, HeaderValidationMode.NONE, metricsSystem); - - ethContext.getScheduler() - .scheduleSyncWorkerTask( - () -> { - if(blockHeaderValidator.validateHeader(block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { - ethContext.getScheduler().scheduleSyncWorkerTask(() -> broadcastBlock(block, parent)); - ethContext.getScheduler().scheduleSyncWorkerTask(importTask::run).whenComplete( - (result, throwable) -> { - importingBlocks.remove(block.getHash()); - if (throwable != null) { - LOG.warn( - "Failed to import announced block {} ({}).", - block.getHeader().getNumber(), - block.getHash()); - } else { - final double timeInMs = importTask.getTaskTimeInSec() * 1000; - LOG.info( - String.format( - "Successfully imported announced block %d (%s) in %01.3fms.", - block.getHeader().getNumber(), block.getHash(), timeInMs)); - } - }); - } else { - importingBlocks.remove(block.getHash()); - LOG.warn("Failed to import announced block {} ({}).", block.getHeader().getNumber(), block.getHash()); - } - }); - - return null; + final PersistBlockTask importTask = + PersistBlockTask.create( + protocolSchedule, protocolContext, block, HeaderValidationMode.NONE, metricsSystem); + + return ethContext + .getScheduler() + .scheduleSyncWorkerTask( + () -> { + if (blockHeaderValidator.validateHeader( + block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { + ethContext + .getScheduler() + .scheduleSyncWorkerTask(() -> broadcastBlock(block, parent)); + return ethContext + .getScheduler() + .scheduleSyncWorkerTask(importTask::run) + .whenComplete( + (result, throwable) -> { + importingBlocks.remove(block.getHash()); + if (throwable != null) { + LOG.warn( + "Failed to import announced block {} ({}).", + block.getHeader().getNumber(), + block.getHash()); + } else { + final double timeInMs = importTask.getTaskTimeInSec() * 1000; + LOG.info( + String.format( + "Successfully imported announced block %d (%s) in %01.3fms.", + block.getHeader().getNumber(), block.getHash(), timeInMs)); + } + }); + } else { + importingBlocks.remove(block.getHash()); + LOG.warn( + "Failed to import announced block {} ({}).", + block.getHeader().getNumber(), + block.getHash()); + return CompletableFuture.completedFuture(block); + } + }); } // Only import blocks within a certain range of our head and sync target diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java index ac97870826..1a6c66bed3 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManagerTest.java @@ -75,7 +75,7 @@ public static void setupSuite() { } @Before - public void setup() throws Exception { + public void setup() { blockchainUtil = BlockchainSetupUtil.forTesting(); blockchain = spy(blockchainUtil.getBlockchain()); protocolSchedule = blockchainUtil.getProtocolSchedule(); @@ -543,7 +543,6 @@ public void shouldNotImportBlocksThatAreAlreadyBeingImported() { final EthScheduler ethScheduler = mock(EthScheduler.class); when(ethScheduler.scheduleSyncWorkerTask(any(Supplier.class))) .thenReturn(new CompletableFuture<>()); - final EthContext ethContext = new EthContext("eth", new EthPeers("eth"), new EthMessages(), ethScheduler); final BlockPropagationManager blockPropagationManager = From 4e95ccae8b958f1ee99e89f62653c4dbdf1110a2 Mon Sep 17 00:00:00 2001 From: "S. Matthew English" Date: Mon, 25 Feb 2019 18:13:14 -0500 Subject: [PATCH 7/8] nxt-update --- .../eth/sync/BlockPropagationManager.java | 69 ++++++++++--------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 57cbce4618..22c2507af6 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -290,47 +290,50 @@ CompletableFuture importOrSavePendingBlock(final Block block) { final ProtocolSpec protocolSpec = protocolSchedule.getByBlockNumber(block.getHeader().getNumber()); final BlockHeaderValidator blockHeaderValidator = protocolSpec.getBlockHeaderValidator(); + return ethContext + .getScheduler() + .scheduleSyncWorkerTask( + () -> validateAndProcessPendingBlock(blockHeaderValidator, block, parent)); + } + + private CompletableFuture validateAndProcessPendingBlock( + final BlockHeaderValidator blockHeaderValidator, + final Block block, + final BlockHeader parent) { + if (blockHeaderValidator.validateHeader( + block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { + ethContext.getScheduler().scheduleSyncWorkerTask(() -> broadcastBlock(block, parent)); + return ethContext.getScheduler().scheduleSyncWorkerTask(() -> runImportTask(block)); + } else { + importingBlocks.remove(block.getHash()); + LOG.warn( + "Failed to import announced block {} ({}).", + block.getHeader().getNumber(), + block.getHash()); + return CompletableFuture.completedFuture(block); + } + } - // Import block + private CompletableFuture runImportTask(final Block block) { final PersistBlockTask importTask = PersistBlockTask.create( protocolSchedule, protocolContext, block, HeaderValidationMode.NONE, metricsSystem); - - return ethContext - .getScheduler() - .scheduleSyncWorkerTask( - () -> { - if (blockHeaderValidator.validateHeader( - block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { - ethContext - .getScheduler() - .scheduleSyncWorkerTask(() -> broadcastBlock(block, parent)); - return ethContext - .getScheduler() - .scheduleSyncWorkerTask(importTask::run) - .whenComplete( - (result, throwable) -> { - importingBlocks.remove(block.getHash()); - if (throwable != null) { - LOG.warn( - "Failed to import announced block {} ({}).", - block.getHeader().getNumber(), - block.getHash()); - } else { - final double timeInMs = importTask.getTaskTimeInSec() * 1000; - LOG.info( - String.format( - "Successfully imported announced block %d (%s) in %01.3fms.", - block.getHeader().getNumber(), block.getHash(), timeInMs)); - } - }); - } else { - importingBlocks.remove(block.getHash()); + return importTask + .run() + .whenComplete( + (result, throwable) -> { + importingBlocks.remove(block.getHash()); + if (throwable != null) { LOG.warn( "Failed to import announced block {} ({}).", block.getHeader().getNumber(), block.getHash()); - return CompletableFuture.completedFuture(block); + } else { + final double timeInMs = importTask.getTaskTimeInSec() * 1000; + LOG.info( + String.format( + "Successfully imported announced block %d (%s) in %01.3fms.", + block.getHeader().getNumber(), block.getHash(), timeInMs)); } }); } From 9c8804f6204db855cab4cc3b70814b35ab74c091 Mon Sep 17 00:00:00 2001 From: "S. Matthew English" Date: Mon, 25 Feb 2019 18:57:28 -0500 Subject: [PATCH 8/8] update update --- .../pantheon/ethereum/eth/sync/BlockPropagationManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java index 22c2507af6..47c52cfae8 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockPropagationManager.java @@ -303,7 +303,7 @@ private CompletableFuture validateAndProcessPendingBlock( if (blockHeaderValidator.validateHeader( block.getHeader(), parent, protocolContext, HeaderValidationMode.FULL)) { ethContext.getScheduler().scheduleSyncWorkerTask(() -> broadcastBlock(block, parent)); - return ethContext.getScheduler().scheduleSyncWorkerTask(() -> runImportTask(block)); + return runImportTask(block); } else { importingBlocks.remove(block.getHash()); LOG.warn(