Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Ibft notifies EthPeer when remote node has a better block (#849)
Browse files Browse the repository at this point in the history
There is a failure mode of IBFT whereby a validator fails to import
a block, and also fails to receive the NewBlock message from its
peers. This means said validator is unable to participate in
subsequent rounds, and may cause the network to halt.

To overcome this issue, if an IBFT validator receives messages from
a future height, it will update the "BestEstimatedHeight" of the
corresponding EthPeer object, such that the Synchroniser will
(eventually) download the requisite blocks - thus allowing the
IBFT network to continue to operate.
  • Loading branch information
rain-on authored Feb 20, 2019
1 parent 38edb0f commit 8b5fada
Show file tree
Hide file tree
Showing 13 changed files with 282 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -124,6 +125,10 @@ public void commitForNonProposing(final ConsensusRoundIdentifier roundId, final
nonProposingPeers.forEach(peer -> peer.injectCommit(roundId, hash));
}

public void forNonProposing(final Consumer<ValidatorPeer> assertion) {
nonProposingPeers.forEach(assertion);
}

public Collection<SignedData<PreparePayload>> createSignedPreparePayloadOfNonProposing(
final ConsensusRoundIdentifier preparedRound, final Hash hash) {
return nonProposingPeers.stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft.support;

import static java.util.function.Function.identity;

import tech.pegasys.pantheon.consensus.ibft.SynchronizerUpdater;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class StubbedSynchronizerUpdater implements SynchronizerUpdater {

private Map<PeerConnection, ValidatorPeer> validatorNodes = new HashMap<>();

@Override
public void updatePeerChainState(
final long knownBlockNumber, final PeerConnection peerConnection) {
validatorNodes.get(peerConnection).updateEstimatedChainHeight(knownBlockNumber);
}

public void addNetworkPeers(final Collection<ValidatorPeer> nodes) {
validatorNodes.putAll(
nodes.stream().collect(Collectors.toMap(ValidatorPeer::getPeerConnection, identity())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import tech.pegasys.pantheon.consensus.ibft.IbftHelpers;
import tech.pegasys.pantheon.consensus.ibft.IbftProtocolSchedule;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
import tech.pegasys.pantheon.consensus.ibft.SynchronizerUpdater;
import tech.pegasys.pantheon.consensus.ibft.UniqueMessageMulticaster;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreatorFactory;
import tech.pegasys.pantheon.consensus.ibft.blockcreation.ProposerSelector;
Expand Down Expand Up @@ -164,9 +165,17 @@ public TestContext build() {

final Gossiper gossiper = useGossip ? new IbftGossip(uniqueMulticaster) : mock(Gossiper.class);

final StubbedSynchronizerUpdater synchronizerUpdater = new StubbedSynchronizerUpdater();

final ControllerAndState controllerAndState =
createControllerAndFinalState(
blockChain, multicaster, nodeKeys, clock, ibftEventQueue, gossiper);
blockChain,
multicaster,
nodeKeys,
clock,
ibftEventQueue,
gossiper,
synchronizerUpdater);

// Add each networkNode to the Multicaster (such that each can receive msgs from local node).
// NOTE: the remotePeers needs to be ordered based on Address (as this is used to determine
Expand All @@ -187,6 +196,7 @@ public TestContext build() {
LinkedHashMap::new));

multicaster.addNetworkPeers(remotePeers.values());
synchronizerUpdater.addNetworkPeers(remotePeers.values());

return new TestContext(
remotePeers,
Expand Down Expand Up @@ -227,7 +237,8 @@ private static ControllerAndState createControllerAndFinalState(
final KeyPair nodeKeys,
final Clock clock,
final IbftEventQueue ibftEventQueue,
final Gossiper gossiper) {
final Gossiper gossiper,
final SynchronizerUpdater synchronizerUpdater) {

final WorldStateArchive worldStateArchive = createInMemoryWorldStateArchive();

Expand Down Expand Up @@ -312,7 +323,8 @@ private static ControllerAndState createControllerAndFinalState(
messageValidatorFactory),
messageValidatorFactory),
gossiper,
DUPLICATE_MESSAGE_LIMIT);
DUPLICATE_MESSAGE_LIMIT,
synchronizerUpdater);

final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController);
//////////////////////////// END IBFT PantheonController ////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.consensus.ibft.support;

import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.EventMultiplexer;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvents;
Expand Down Expand Up @@ -53,6 +55,7 @@ public class ValidatorPeer {
private final List<MessageData> receivedMessages = Lists.newArrayList();

private final EventMultiplexer localEventMultiplexer;
private long estimatedChainHeight = 0;

public ValidatorPeer(
final NodeParams nodeParams,
Expand All @@ -74,6 +77,10 @@ public KeyPair getNodeKeys() {
return nodeKeys;
}

public PeerConnection getPeerConnection() {
return peerConnection;
}

public Proposal injectProposal(final ConsensusRoundIdentifier rId, final Block block) {
final Proposal payload = messageFactory.createProposal(rId, block, Optional.empty());

Expand Down Expand Up @@ -147,4 +154,12 @@ public MessageFactory getMessageFactory() {
public KeyPair getNodeKeyPair() {
return nodeKeys;
}

public void updateEstimatedChainHeight(final long estimatedChainHeight) {
this.estimatedChainHeight = estimatedChainHeight;
}

public void verifyEstimatedChainHeightEquals(final long expectedChainHeight) {
assertThat(estimatedChainHeight).isEqualTo(expectedChainHeight);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,17 @@ public void messagesForFutureHeightAreBufferedUntilChainHeightCatchesUp() {
peers.getProposer().injectProposal(futureHeightRoundId, futureHeightBlock);
peers.verifyNoMessagesReceived();

// verify that we have incremented the estimated height of the proposer.
peers
.getProposer()
.verifyEstimatedChainHeightEquals(futureHeightBlock.getHeader().getNumber() - 1);

// Inject prepares and commits from all peers
peers.prepareForNonProposing(futureHeightRoundId, futureHeightBlock.getHash());
peers.forNonProposing(
peer ->
peer.verifyEstimatedChainHeightEquals(futureHeightBlock.getHeader().getNumber() - 1));

peers.commitForNonProposing(futureHeightRoundId, futureHeightBlock.getHash());

peers.verifyNoMessagesReceived();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;

import static org.apache.logging.log4j.LogManager.getLogger;

import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;

import org.apache.logging.log4j.Logger;

public class EthSynchronizerUpdater implements SynchronizerUpdater {

private static final Logger LOG = getLogger();
private final EthPeers ethPeers;

public EthSynchronizerUpdater(final EthPeers ethPeers) {
this.ethPeers = ethPeers;
}

@Override
public void updatePeerChainState(
final long knownBlockNumber, final PeerConnection peerConnection) {
final EthPeer ethPeer = ethPeers.peer(peerConnection);
if (ethPeer == null) {
LOG.debug("Received message from a peer with no corresponding EthPeer.");
return;
}
ethPeer.chainState().updateHeightEstimate(knownBlockNumber);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;

import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;

public interface SynchronizerUpdater {

void updatePeerChainState(long knownBlockNumber, PeerConnection peerConnection);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.Gossiper;
import tech.pegasys.pantheon.consensus.ibft.MessageTracker;
import tech.pegasys.pantheon.consensus.ibft.SynchronizerUpdater;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
Expand Down Expand Up @@ -44,6 +45,7 @@
import org.apache.logging.log4j.Logger;

public class IbftController {

private static final Logger LOG = LogManager.getLogger();
private final Blockchain blockchain;
private final IbftFinalState ibftFinalState;
Expand All @@ -52,20 +54,23 @@ public class IbftController {
private BlockHeightManager currentHeightManager;
private final Gossiper gossiper;
private final MessageTracker duplicateMessageTracker;
private final SynchronizerUpdater sychronizerUpdater;

public IbftController(
final Blockchain blockchain,
final IbftFinalState ibftFinalState,
final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory,
final Gossiper gossiper,
final int duplicateMessageLimit) {
final int duplicateMessageLimit,
final SynchronizerUpdater sychronizerUpdater) {
this(
blockchain,
ibftFinalState,
ibftBlockHeightManagerFactory,
gossiper,
Maps.newHashMap(),
new MessageTracker(duplicateMessageLimit));
new MessageTracker(duplicateMessageLimit),
sychronizerUpdater);
}

@VisibleForTesting
Expand All @@ -75,13 +80,15 @@ public IbftController(
final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory,
final Gossiper gossiper,
final Map<Long, List<Message>> futureMessages,
final MessageTracker duplicateMessageTracker) {
final MessageTracker duplicateMessageTracker,
final SynchronizerUpdater sychronizerUpdater) {
this.blockchain = blockchain;
this.ibftFinalState = ibftFinalState;
this.ibftBlockHeightManagerFactory = ibftBlockHeightManagerFactory;
this.futureMessages = futureMessages;
this.gossiper = gossiper;
this.duplicateMessageTracker = duplicateMessageTracker;
this.sychronizerUpdater = sychronizerUpdater;
}

public void start() {
Expand Down Expand Up @@ -213,6 +220,10 @@ private boolean processMessage(final IbftMessage<?> msg, final Message rawMsg) {
return isMsgFromKnownValidator(msg) && ibftFinalState.isLocalNodeValidator();
} else if (isMsgForFutureChainHeight(msgRoundIdentifier)) {
addMessageToFutureMessageBuffer(msgRoundIdentifier.getSequenceNumber(), rawMsg);
// Notify the synchronizer the transmitting peer must have the parent block to the received
// message's target height.
sychronizerUpdater.updatePeerChainState(
msgRoundIdentifier.getSequenceNumber() - 1L, rawMsg.getConnection());
} else {
LOG.debug(
"IBFT message discarded as it is from a previous block height messageType={} chainHeight={} eventHeight={}",
Expand Down
Loading

0 comments on commit 8b5fada

Please sign in to comment.