diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftFinalState.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftFinalState.java index 8912800d15..0fbd7a2820 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftFinalState.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftFinalState.java @@ -15,7 +15,6 @@ import static tech.pegasys.pantheon.consensus.ibft.IbftHelpers.calculateRequiredValidatorQuorum; import tech.pegasys.pantheon.consensus.common.ValidatorProvider; -import tech.pegasys.pantheon.consensus.common.VoteTally; import tech.pegasys.pantheon.consensus.ibft.BlockTimer; import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; import tech.pegasys.pantheon.consensus.ibft.IbftContext; @@ -44,7 +43,7 @@ public class IbftFinalState { private final BlockHeaderValidator ibftContextBlockHeaderValidator; public IbftFinalState( - final VoteTally validatorProvider, + final ValidatorProvider validatorProvider, final KeyPair nodeKeys, final Address localAddress, final ProposerSelector proposerSelector, diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManager.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManager.java new file mode 100644 index 0000000000..8649ce47d7 --- /dev/null +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManager.java @@ -0,0 +1,150 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft.statemachine; + +import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; +import tech.pegasys.pantheon.consensus.ibft.IbftHelpers; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangeCertificate; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangePayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData; +import tech.pegasys.pantheon.consensus.ibft.validation.RoundChangeMessageValidator; +import tech.pegasys.pantheon.consensus.ibft.validation.RoundChangeMessageValidator.MessageValidatorFactory; +import tech.pegasys.pantheon.ethereum.core.Address; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Responsible for handling all RoundChange messages received for a given block height + * (theoretically, RoundChange messages for a older height should have been previously discarded, + * and messages for a future round should have been buffered). + * + *

If enough RoundChange messages all targeting a given round are received (and this node is the + * proposer for said round) - a newRound message is sent, and a new round should be started by the + * controlling class. + */ +public class RoundChangeManager { + + public static class RoundChangeStatus { + private final int quorumSize; + + // Store only 1 round change per round per validator + @VisibleForTesting + final Map> receivedMessages = Maps.newHashMap(); + + private boolean actioned = false; + + public RoundChangeStatus(final int quorumSize) { + this.quorumSize = quorumSize; + } + + public void addMessage(final SignedData msg) { + if (!actioned) { + receivedMessages.put(msg.getSender(), msg); + } + } + + public boolean roundChangeReady() { + return receivedMessages.size() >= quorumSize && !actioned; + } + + public RoundChangeCertificate createRoundChangeCertificate() { + if (roundChangeReady()) { + actioned = true; + return new RoundChangeCertificate(receivedMessages.values()); + } else { + throw new IllegalStateException("Unable to create RoundChangeCertificate at this time."); + } + } + } + + private static final Logger LOG = LogManager.getLogger(); + + @VisibleForTesting + final Map roundChangeCache = Maps.newHashMap(); + + private final int quorumSize; + private final RoundChangeMessageValidator roundChangeMessageValidator; + + public RoundChangeManager( + final long sequenceNumber, + final Collection

validators, + final MessageValidatorFactory messageValidityFactory) { + this.quorumSize = IbftHelpers.calculateRequiredValidatorQuorum(validators.size()); + this.roundChangeMessageValidator = + new RoundChangeMessageValidator( + messageValidityFactory, validators, quorumSize, sequenceNumber); + } + + /** + * Adds the round message to this manager and return a certificate if it passes the threshold + * + * @param msg The signed round change message to add + * @return Empty if the round change threshold hasn't been hit, otherwise a round change + * certificate + */ + public Optional appendRoundChangeMessage( + final SignedData msg) { + + if (!isMessageValid(msg)) { + LOG.info("RoundChange message was invalid."); + return Optional.empty(); + } + + final RoundChangeStatus roundChangeStatus = storeRoundChangeMessage(msg); + + if (roundChangeStatus.roundChangeReady()) { + return Optional.of(roundChangeStatus.createRoundChangeCertificate()); + } + + return Optional.empty(); + } + + private boolean isMessageValid(final SignedData msg) { + return roundChangeMessageValidator.validateMessage(msg); + } + + private RoundChangeStatus storeRoundChangeMessage(final SignedData msg) { + final ConsensusRoundIdentifier msgTargetRound = msg.getPayload().getRoundChangeIdentifier(); + + final RoundChangeStatus roundChangeStatus = + roundChangeCache.computeIfAbsent( + msgTargetRound, ignored -> new RoundChangeStatus(quorumSize)); + + roundChangeStatus.addMessage(msg); + + return roundChangeStatus; + } + + /** + * Clears old rounds from storage that have been superseded by a given round + * + * @param completedRoundIdentifier round identifier that has been identified as superseded + */ + public void discardCompletedRound(final ConsensusRoundIdentifier completedRoundIdentifier) { + roundChangeCache + .entrySet() + .removeIf(entry -> isAnEarlierOrEqualRound(entry.getKey(), completedRoundIdentifier)); + } + + private boolean isAnEarlierOrEqualRound( + final ConsensusRoundIdentifier left, final ConsensusRoundIdentifier right) { + return left.getRoundNumber() <= right.getRoundNumber(); + } +} diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManagerTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManagerTest.java new file mode 100644 index 0000000000..a69752137f --- /dev/null +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/RoundChangeManagerTest.java @@ -0,0 +1,196 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft.statemachine; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; +import tech.pegasys.pantheon.consensus.ibft.IbftContext; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.MessageFactory; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangePayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData; +import tech.pegasys.pantheon.consensus.ibft.validation.MessageValidator; +import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; +import tech.pegasys.pantheon.ethereum.ProtocolContext; +import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; +import tech.pegasys.pantheon.ethereum.core.Address; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Util; +import tech.pegasys.pantheon.ethereum.db.WorldStateArchive; +import tech.pegasys.pantheon.ethereum.mainnet.BlockHeaderValidator; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.Before; +import org.junit.Test; + +public class RoundChangeManagerTest { + + private RoundChangeManager manager; + + private final KeyPair proposerKey = KeyPair.generate(); + private final KeyPair validator1Key = KeyPair.generate(); + private final KeyPair validator2Key = KeyPair.generate(); + private final KeyPair nonValidatorKey = KeyPair.generate(); + + private final ConsensusRoundIdentifier ri1 = new ConsensusRoundIdentifier(2, 1); + private final ConsensusRoundIdentifier ri2 = new ConsensusRoundIdentifier(2, 2); + private final ConsensusRoundIdentifier ri3 = new ConsensusRoundIdentifier(2, 3); + + @Before + public void setup() { + List
validators = Lists.newArrayList(); + + validators.add(Util.publicKeyToAddress(proposerKey.getPublicKey())); + validators.add(Util.publicKeyToAddress(validator1Key.getPublicKey())); + validators.add(Util.publicKeyToAddress(validator2Key.getPublicKey())); + + final ProtocolContext protocolContext = + new ProtocolContext<>( + mock(MutableBlockchain.class), mock(WorldStateArchive.class), mock(IbftContext.class)); + + @SuppressWarnings("unchecked") + BlockHeaderValidator headerValidator = + (BlockHeaderValidator) mock(BlockHeaderValidator.class); + BlockHeader parentHeader = mock(BlockHeader.class); + + Map messageValidators = Maps.newHashMap(); + + messageValidators.put( + ri1, + new MessageValidator( + validators, + Util.publicKeyToAddress(proposerKey.getPublicKey()), + ri1, + headerValidator, + protocolContext, + parentHeader)); + + messageValidators.put( + ri2, + new MessageValidator( + validators, + Util.publicKeyToAddress(validator1Key.getPublicKey()), + ri2, + headerValidator, + protocolContext, + parentHeader)); + + messageValidators.put( + ri3, + new MessageValidator( + validators, + Util.publicKeyToAddress(validator2Key.getPublicKey()), + ri3, + headerValidator, + protocolContext, + parentHeader)); + + manager = new RoundChangeManager(2, validators, messageValidators::get); + } + + private SignedData makeRoundChangeMessage( + final KeyPair key, final ConsensusRoundIdentifier round) { + MessageFactory messageFactory = new MessageFactory(key); + return messageFactory.createSignedRoundChangePayload(round, Optional.empty()); + } + + @Test + public void rejectsInvalidRoundChangeMessage() { + SignedData roundChangeData = makeRoundChangeMessage(nonValidatorKey, ri1); + assertThat(manager.appendRoundChangeMessage(roundChangeData)).isEmpty(); + assertThat(manager.roundChangeCache.get(ri1)).isNull(); + } + + @Test + public void acceptsValidRoundChangeMessage() { + SignedData roundChangeData = makeRoundChangeMessage(proposerKey, ri2); + assertThat(manager.appendRoundChangeMessage(roundChangeData)).isEmpty(); + assertThat(manager.roundChangeCache.get(ri2).receivedMessages.size()).isEqualTo(1); + } + + @Test + public void doesntAcceptDuplicateValidRoundChangeMessage() { + SignedData roundChangeData = makeRoundChangeMessage(proposerKey, ri2); + assertThat(manager.appendRoundChangeMessage(roundChangeData)).isEmpty(); + assertThat(manager.appendRoundChangeMessage(roundChangeData)).isEmpty(); + assertThat(manager.roundChangeCache.get(ri2).receivedMessages.size()).isEqualTo(1); + } + + @Test + public void becomesReadyAtThreshold() { + SignedData roundChangeDataProposer = + makeRoundChangeMessage(proposerKey, ri2); + SignedData roundChangeDataValidator1 = + makeRoundChangeMessage(validator1Key, ri2); + assertThat(manager.appendRoundChangeMessage(roundChangeDataProposer)) + .isEqualTo(Optional.empty()); + assertThat(manager.appendRoundChangeMessage(roundChangeDataValidator1).isPresent()).isTrue(); + } + + @Test + public void doesntReachReadyWhenSuppliedWithDifferentRounds() { + SignedData roundChangeDataProposer = + makeRoundChangeMessage(proposerKey, ri2); + SignedData roundChangeDataValidator1 = + makeRoundChangeMessage(validator1Key, ri3); + assertThat(manager.appendRoundChangeMessage(roundChangeDataProposer)) + .isEqualTo(Optional.empty()); + assertThat(manager.appendRoundChangeMessage(roundChangeDataValidator1)) + .isEqualTo(Optional.empty()); + assertThat(manager.roundChangeCache.get(ri2).receivedMessages.size()).isEqualTo(1); + assertThat(manager.roundChangeCache.get(ri3).receivedMessages.size()).isEqualTo(1); + } + + @Test + public void discardsPreviousRounds() { + SignedData roundChangeDataProposer = + makeRoundChangeMessage(proposerKey, ri1); + SignedData roundChangeDataValidator1 = + makeRoundChangeMessage(validator1Key, ri2); + SignedData roundChangeDataValidator2 = + makeRoundChangeMessage(validator2Key, ri3); + assertThat(manager.appendRoundChangeMessage(roundChangeDataProposer)) + .isEqualTo(Optional.empty()); + assertThat(manager.appendRoundChangeMessage(roundChangeDataValidator1)) + .isEqualTo(Optional.empty()); + assertThat(manager.appendRoundChangeMessage(roundChangeDataValidator2)) + .isEqualTo(Optional.empty()); + manager.discardCompletedRound(ri1); + assertThat(manager.roundChangeCache.get(ri1)).isNull(); + assertThat(manager.roundChangeCache.get(ri2).receivedMessages.size()).isEqualTo(1); + assertThat(manager.roundChangeCache.get(ri3).receivedMessages.size()).isEqualTo(1); + } + + @Test + public void stopsAcceptingMessagesAfterReady() { + SignedData roundChangeDataProposer = + makeRoundChangeMessage(proposerKey, ri2); + SignedData roundChangeDataValidator1 = + makeRoundChangeMessage(validator1Key, ri2); + SignedData roundChangeDataValidator2 = + makeRoundChangeMessage(validator2Key, ri2); + assertThat(manager.appendRoundChangeMessage(roundChangeDataProposer)) + .isEqualTo(Optional.empty()); + assertThat(manager.appendRoundChangeMessage(roundChangeDataValidator1).isPresent()).isTrue(); + assertThat(manager.roundChangeCache.get(ri2).receivedMessages.size()).isEqualTo(2); + assertThat(manager.appendRoundChangeMessage(roundChangeDataValidator2)) + .isEqualTo(Optional.empty()); + assertThat(manager.roundChangeCache.get(ri2).receivedMessages.size()).isEqualTo(2); + } +}