diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/MessageReceptionHelpers.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/MessageReceptionHelpers.java index 0a922fce95..f76f454cff 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/MessageReceptionHelpers.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/MessageReceptionHelpers.java @@ -14,12 +14,12 @@ import static org.assertj.core.api.Assertions.assertThat; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessage; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData; import tech.pegasys.pantheon.consensus.ibft.ibftmessage.IbftV2; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessage; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.Payload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; @@ -60,15 +60,15 @@ public static boolean msgMatchesExpected( switch (expectedPayload.getMessageType()) { case IbftV2.PROPOSAL: - return ProposalMessage.fromMessage(actual).decode().equals(expected); + return ProposalMessageData.fromMessageData(actual).decode().equals(expected); case IbftV2.PREPARE: - return PrepareMessage.fromMessage(actual).decode().equals(expected); + return PrepareMessageData.fromMessageData(actual).decode().equals(expected); case IbftV2.COMMIT: - return CommitMessage.fromMessage(actual).decode().equals(expected); + return CommitMessageData.fromMessageData(actual).decode().equals(expected); case IbftV2.NEW_ROUND: - return NewRoundMessage.fromMessage(actual).decode().equals(expected); + return NewRoundMessageData.fromMessageData(actual).decode().equals(expected); case IbftV2.ROUND_CHANGE: - return RoundChangeMessage.fromMessage(actual).decode().equals(expected); + return RoundChangeMessageData.fromMessageData(actual).decode().equals(expected); default: return false; } diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/StubIbftMulticaster.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/StubIbftMulticaster.java index 24f41065a2..57d123abd0 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/StubIbftMulticaster.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/StubIbftMulticaster.java @@ -13,6 +13,7 @@ package tech.pegasys.pantheon.consensus.ibft.support; import tech.pegasys.pantheon.consensus.ibft.network.IbftMulticaster; +import tech.pegasys.pantheon.ethereum.core.Address; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import java.util.Collection; @@ -32,6 +33,15 @@ public void addNetworkPeers(final Collection nodes) { @Override public void multicastToValidators(final MessageData message) { - validatorNodes.forEach(v -> v.handleReceivedMessage(message)); + validatorNodes.forEach(peer -> peer.handleReceivedMessage(message)); + } + + @Override + public void multicastToValidatorsExcept( + final MessageData message, final Collection
exceptAddresses) { + validatorNodes + .stream() + .filter(peer -> !exceptAddresses.contains(peer.getNodeAddress())) + .forEach(peer -> peer.handleReceivedMessage(message)); } } diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/StubbedPeerConnection.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/StubbedPeerConnection.java new file mode 100644 index 0000000000..247a1566a6 --- /dev/null +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/StubbedPeerConnection.java @@ -0,0 +1,58 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft.support; + +import static java.util.Collections.emptyList; + +import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; +import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; +import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; +import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.net.SocketAddress; +import java.util.Set; + +public class StubbedPeerConnection implements PeerConnection { + + @Override + public void send(final Capability capability, final MessageData message) + throws PeerNotConnected {} + + @Override + public Set getAgreedCapabilities() { + return null; + } + + @Override + public PeerInfo getPeer() { + return new PeerInfo(0, "IbftIntTestPeer", emptyList(), 0, BytesValue.EMPTY); + } + + @Override + public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) {} + + @Override + public void disconnect(final DisconnectReason reason) {} + + @Override + public SocketAddress getLocalAddress() { + return null; + } + + @Override + public SocketAddress getRemoteAddress() { + return null; + } +} diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextFactory.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextFactory.java index 901660d2ee..bb5414f0a6 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextFactory.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextFactory.java @@ -13,6 +13,7 @@ package tech.pegasys.pantheon.consensus.ibft.support; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.mockito.Mockito.mock; import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain; import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive; @@ -29,6 +30,7 @@ import tech.pegasys.pantheon.consensus.ibft.IbftContext; import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue; import tech.pegasys.pantheon.consensus.ibft.IbftExtraData; +import tech.pegasys.pantheon.consensus.ibft.IbftGossip; import tech.pegasys.pantheon.consensus.ibft.IbftHelpers; import tech.pegasys.pantheon.consensus.ibft.IbftProtocolSchedule; import tech.pegasys.pantheon.consensus.ibft.RoundTimer; @@ -64,6 +66,7 @@ import java.time.Instant; import java.time.ZoneId; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; @@ -252,6 +255,9 @@ private static ControllerAndState createControllerAndFinalState( final MessageValidatorFactory messageValidatorFactory = new MessageValidatorFactory(proposerSelector, blockHeaderValidator, protocolContext); + // Disable Gossiping for integration tests. + final IbftGossip gossiper = mock(IbftGossip.class); + final IbftController ibftController = new IbftController( blockChain, @@ -260,7 +266,9 @@ private static ControllerAndState createControllerAndFinalState( finalState, new IbftRoundFactory(finalState, protocolContext, protocolSchedule), messageValidatorFactory, - protocolContext)); + protocolContext), + new HashMap<>(), + gossiper); //////////////////////////// END IBFT PantheonController //////////////////////////// return new ControllerAndState(ibftController, finalState); diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/ValidatorPeer.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/ValidatorPeer.java index ccd2cc4180..74260ef007 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/ValidatorPeer.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/ValidatorPeer.java @@ -15,11 +15,11 @@ import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvents; import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessage; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.MessageFactory; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload; @@ -37,6 +37,7 @@ import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage; import java.util.Collections; @@ -51,6 +52,7 @@ public class ValidatorPeer { private final Address nodeAddress; private final KeyPair nodeKeys; private final MessageFactory messageFactory; + private final PeerConnection peerConnection = new StubbedPeerConnection(); private List receivedMessages = Lists.newArrayList(); private final IbftController localNodeController; @@ -65,11 +67,16 @@ public ValidatorPeer( this.localNodeController = localNodeController; } + public Address getNodeAddress() { + return nodeAddress; + } + public SignedData injectProposal( final ConsensusRoundIdentifier rId, final Block block) { final SignedData payload = messageFactory.createSignedProposalPayload(rId, block); - injectMessage(ProposalMessage.create(payload)); + + injectMessage(ProposalMessageData.create(payload)); return payload; } @@ -77,7 +84,7 @@ public SignedData injectPrepare( final ConsensusRoundIdentifier rId, final Hash digest) { final SignedData payload = messageFactory.createSignedPreparePayload(rId, digest); - injectMessage(PrepareMessage.create(payload)); + injectMessage(PrepareMessageData.create(payload)); return payload; } @@ -86,7 +93,7 @@ public SignedData injectCommit( final Signature commitSeal = SECP256K1.sign(digest, nodeKeys); final SignedData payload = messageFactory.createSignedCommitPayload(rId, digest, commitSeal); - injectMessage(CommitMessage.create(payload)); + injectMessage(CommitMessageData.create(payload)); return payload; } @@ -97,7 +104,7 @@ public SignedData injectNewRound( final SignedData payload = messageFactory.createSignedNewRoundPayload(rId, roundChangeCertificate, proposalPayload); - injectMessage(NewRoundMessage.create(payload)); + injectMessage(NewRoundMessageData.create(payload)); return payload; } @@ -105,7 +112,7 @@ public SignedData injectRoundChange( final ConsensusRoundIdentifier rId, final Optional preparedCertificate) { final SignedData payload = messageFactory.createSignedRoundChangePayload(rId, preparedCertificate); - injectMessage(RoundChangeMessage.create(payload)); + injectMessage(RoundChangeMessageData.create(payload)); return payload; } @@ -122,7 +129,7 @@ public void clearReceivedMessages() { } public void injectMessage(final MessageData msgData) { - final DefaultMessage message = new DefaultMessage(null, msgData); + final DefaultMessage message = new DefaultMessage(peerConnection, msgData); localNodeController.handleMessageEvent( (IbftReceivedMessageEvent) IbftEvents.fromMessage(message)); } diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftGossip.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftGossip.java new file mode 100644 index 0000000000..f135feaf06 --- /dev/null +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftGossip.java @@ -0,0 +1,108 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft; + +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.IbftV2; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData; +import tech.pegasys.pantheon.consensus.ibft.network.IbftMulticaster; +import tech.pegasys.pantheon.crypto.SECP256K1.Signature; +import tech.pegasys.pantheon.ethereum.core.Address; +import tech.pegasys.pantheon.ethereum.p2p.api.Message; +import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Lists; + +/** Class responsible for rebroadcasting IBFT messages to known validators */ +public class IbftGossip { + private final IbftMulticaster peers; + + // Size of the seenMessages cache, should end up utilising 65bytes * this number + some meta data + private final int maxSeenMessages; + + // Set that starts evicting members when it hits capacity + private final Set seenMessages = + Collections.newSetFromMap( + new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(final Map.Entry eldest) { + return size() > maxSeenMessages; + } + }); + + IbftGossip(final IbftMulticaster peers, final int maxSeenMessages) { + this.maxSeenMessages = maxSeenMessages; + this.peers = peers; + } + + /** + * Constructor that attaches gossip logic to a set of peers + * + * @param peers The always up to date set of connected peers that understand IBFT + */ + public IbftGossip(final IbftMulticaster peers) { + this(peers, 10_000); + } + + /** + * Retransmit a given IBFT message to other known validators nodes + * + * @param message The raw message to be gossiped + * @return Whether the message was rebroadcast or has been ignored as a repeat + */ + public boolean gossipMessage(final Message message) { + final MessageData messageData = message.getData(); + final SignedData signedData; + switch (messageData.getCode()) { + case IbftV2.PROPOSAL: + signedData = ProposalMessageData.fromMessageData(messageData).decode(); + break; + case IbftV2.PREPARE: + signedData = PrepareMessageData.fromMessageData(messageData).decode(); + break; + case IbftV2.COMMIT: + signedData = CommitMessageData.fromMessageData(messageData).decode(); + break; + case IbftV2.ROUND_CHANGE: + signedData = RoundChangeMessageData.fromMessageData(messageData).decode(); + break; + case IbftV2.NEW_ROUND: + signedData = NewRoundMessageData.fromMessageData(messageData).decode(); + break; + default: + throw new IllegalArgumentException( + "Received message does not conform to any recognised IBFT message structure."); + } + final Signature signature = signedData.getSignature(); + if (seenMessages.contains(signature)) { + return false; + } else { + final List
excludeAddressesList = + Lists.newArrayList( + message.getConnection().getPeer().getAddress(), signedData.getSender()); + peers.multicastToValidatorsExcept(messageData, excludeAddressesList); + seenMessages.add(signature); + return true; + } + } +} diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftMessages.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftMessages.java index 3eebbb8635..a6c4954ddc 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftMessages.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftMessages.java @@ -12,12 +12,12 @@ */ package tech.pegasys.pantheon.consensus.ibft; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessage; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData; import tech.pegasys.pantheon.consensus.ibft.ibftmessage.IbftV2; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessage; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData; import tech.pegasys.pantheon.ethereum.p2p.api.Message; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; @@ -29,19 +29,19 @@ public static SignedData fromMessage(final Message message) { switch (messageData.getCode()) { case IbftV2.PROPOSAL: - return ProposalMessage.fromMessage(messageData).decode(); + return ProposalMessageData.fromMessageData(messageData).decode(); case IbftV2.PREPARE: - return PrepareMessage.fromMessage(messageData).decode(); + return PrepareMessageData.fromMessageData(messageData).decode(); case IbftV2.COMMIT: - return CommitMessage.fromMessage(messageData).decode(); + return CommitMessageData.fromMessageData(messageData).decode(); case IbftV2.ROUND_CHANGE: - return RoundChangeMessage.fromMessage(messageData).decode(); + return RoundChangeMessageData.fromMessageData(messageData).decode(); case IbftV2.NEW_ROUND: - return NewRoundMessage.fromMessage(messageData).decode(); + return NewRoundMessageData.fromMessageData(messageData).decode(); default: throw new IllegalArgumentException( diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftevent/IbftEvents.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftevent/IbftEvents.java index ea91fb8f96..15f0770e0d 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftevent/IbftEvents.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftevent/IbftEvents.java @@ -17,7 +17,7 @@ /** Static helper functions for producing and working with IbftEvent objects */ public class IbftEvents { public static IbftEvent fromMessage(final Message message) { - return new IbftReceivedMessageEvent(message.getData()); + return new IbftReceivedMessageEvent(message); } public enum Type { diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftevent/IbftReceivedMessageEvent.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftevent/IbftReceivedMessageEvent.java index 332f559ea8..254a7879cb 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftevent/IbftReceivedMessageEvent.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftevent/IbftReceivedMessageEvent.java @@ -13,18 +13,18 @@ package tech.pegasys.pantheon.consensus.ibft.ibftevent; import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvents.Type; -import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import tech.pegasys.pantheon.ethereum.p2p.api.Message; public class IbftReceivedMessageEvent implements IbftEvent { - private final MessageData messageData; + private final Message message; - public IbftReceivedMessageEvent(final MessageData messageData) { - this.messageData = messageData; + public IbftReceivedMessageEvent(final Message message) { + this.message = message; } - public MessageData getMessageData() { - return messageData; + public Message getMessage() { + return message; } @Override diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/AbstractIbftMessage.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/AbstractIbftMessageData.java similarity index 64% rename from consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/AbstractIbftMessage.java rename to consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/AbstractIbftMessageData.java index 212dc85bcc..c019766603 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/AbstractIbftMessage.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/AbstractIbftMessageData.java @@ -12,6 +12,7 @@ */ package tech.pegasys.pantheon.consensus.ibft.ibftmessage; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.Payload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData; @@ -19,29 +20,30 @@ import java.util.function.Function; -public abstract class AbstractIbftMessage extends AbstractMessageData { - protected AbstractIbftMessage(final BytesValue data) { +public abstract class AbstractIbftMessageData extends AbstractMessageData { + protected AbstractIbftMessageData(final BytesValue data) { super(data); } - public abstract SignedData decode(); + public abstract SignedData decode(); - protected static T fromMessage( - final MessageData message, + protected static T fromMessageData( + final MessageData messageData, final int messageCode, final Class clazz, final Function constructor) { - if (clazz.isInstance(message)) { + if (clazz.isInstance(messageData)) { @SuppressWarnings("unchecked") - T castMessage = (T) message; + T castMessage = (T) messageData; return castMessage; } - final int code = message.getCode(); + final int code = messageData.getCode(); if (code != messageCode) { throw new IllegalArgumentException( - String.format("Message has code %d and thus is not a %s", code, clazz.getSimpleName())); + String.format( + "MessageData has code %d and thus is not a %s", code, clazz.getSimpleName())); } - return constructor.apply(message.getData()); + return constructor.apply(messageData.getData()); } } diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/CommitMessage.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/CommitMessageData.java similarity index 72% rename from consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/CommitMessage.java rename to consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/CommitMessageData.java index 18aa65601e..eec605c15b 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/CommitMessage.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/CommitMessageData.java @@ -18,16 +18,17 @@ import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.util.bytes.BytesValue; -public class CommitMessage extends AbstractIbftMessage { +public class CommitMessageData extends AbstractIbftMessageData { private static final int MESSAGE_CODE = IbftV2.COMMIT; - private CommitMessage(final BytesValue data) { + private CommitMessageData(final BytesValue data) { super(data); } - public static CommitMessage fromMessage(final MessageData message) { - return fromMessage(message, MESSAGE_CODE, CommitMessage.class, CommitMessage::new); + public static CommitMessageData fromMessageData(final MessageData messageData) { + return fromMessageData( + messageData, MESSAGE_CODE, CommitMessageData.class, CommitMessageData::new); } @Override @@ -35,9 +36,9 @@ public SignedData decode() { return SignedData.readSignedCommitPayloadFrom(RLP.input(data)); } - public static CommitMessage create(final SignedData signedPayload) { + public static CommitMessageData create(final SignedData signedPayload) { - return new CommitMessage(signedPayload.encode()); + return new CommitMessageData(signedPayload.encode()); } @Override diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/NewRoundMessage.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/NewRoundMessageData.java similarity index 72% rename from consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/NewRoundMessage.java rename to consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/NewRoundMessageData.java index 5a39f15726..7cafec1db3 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/NewRoundMessage.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/NewRoundMessageData.java @@ -18,16 +18,17 @@ import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.util.bytes.BytesValue; -public class NewRoundMessage extends AbstractIbftMessage { +public class NewRoundMessageData extends AbstractIbftMessageData { private static final int MESSAGE_CODE = IbftV2.NEW_ROUND; - private NewRoundMessage(final BytesValue data) { + private NewRoundMessageData(final BytesValue data) { super(data); } - public static NewRoundMessage fromMessage(final MessageData message) { - return fromMessage(message, MESSAGE_CODE, NewRoundMessage.class, NewRoundMessage::new); + public static NewRoundMessageData fromMessageData(final MessageData messageData) { + return fromMessageData( + messageData, MESSAGE_CODE, NewRoundMessageData.class, NewRoundMessageData::new); } @Override @@ -35,9 +36,9 @@ public SignedData decode() { return SignedData.readSignedNewRoundPayloadFrom(RLP.input(data)); } - public static NewRoundMessage create(final SignedData signedPayload) { + public static NewRoundMessageData create(final SignedData signedPayload) { - return new NewRoundMessage(signedPayload.encode()); + return new NewRoundMessageData(signedPayload.encode()); } @Override diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/PrepareMessage.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/PrepareMessageData.java similarity index 72% rename from consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/PrepareMessage.java rename to consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/PrepareMessageData.java index 5549d85fe4..dfe0248f3e 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/PrepareMessage.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/PrepareMessageData.java @@ -18,16 +18,17 @@ import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.util.bytes.BytesValue; -public class PrepareMessage extends AbstractIbftMessage { +public class PrepareMessageData extends AbstractIbftMessageData { private static final int MESSAGE_CODE = IbftV2.PREPARE; - private PrepareMessage(final BytesValue data) { + private PrepareMessageData(final BytesValue data) { super(data); } - public static PrepareMessage fromMessage(final MessageData message) { - return fromMessage(message, MESSAGE_CODE, PrepareMessage.class, PrepareMessage::new); + public static PrepareMessageData fromMessageData(final MessageData messageData) { + return fromMessageData( + messageData, MESSAGE_CODE, PrepareMessageData.class, PrepareMessageData::new); } @Override @@ -35,9 +36,9 @@ public SignedData decode() { return SignedData.readSignedPreparePayloadFrom(RLP.input(data)); } - public static PrepareMessage create(final SignedData signedPayload) { + public static PrepareMessageData create(final SignedData signedPayload) { - return new PrepareMessage(signedPayload.encode()); + return new PrepareMessageData(signedPayload.encode()); } @Override diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/ProposalMessage.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/ProposalMessageData.java similarity index 72% rename from consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/ProposalMessage.java rename to consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/ProposalMessageData.java index 951e0562ad..b179b2a739 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/ProposalMessage.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/ProposalMessageData.java @@ -18,16 +18,17 @@ import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.util.bytes.BytesValue; -public class ProposalMessage extends AbstractIbftMessage { +public class ProposalMessageData extends AbstractIbftMessageData { private static final int MESSAGE_CODE = IbftV2.PROPOSAL; - private ProposalMessage(final BytesValue data) { + private ProposalMessageData(final BytesValue data) { super(data); } - public static ProposalMessage fromMessage(final MessageData message) { - return fromMessage(message, MESSAGE_CODE, ProposalMessage.class, ProposalMessage::new); + public static ProposalMessageData fromMessageData(final MessageData messageData) { + return fromMessageData( + messageData, MESSAGE_CODE, ProposalMessageData.class, ProposalMessageData::new); } @Override @@ -35,9 +36,9 @@ public SignedData decode() { return SignedData.readSignedProposalPayloadFrom(RLP.input(data)); } - public static ProposalMessage create(final SignedData signedPayload) { + public static ProposalMessageData create(final SignedData signedPayload) { - return new ProposalMessage(signedPayload.encode()); + return new ProposalMessageData(signedPayload.encode()); } @Override diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/RoundChangeMessage.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/RoundChangeMessageData.java similarity index 71% rename from consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/RoundChangeMessage.java rename to consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/RoundChangeMessageData.java index 6fcf934c91..40425bf8e3 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/RoundChangeMessage.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/RoundChangeMessageData.java @@ -18,16 +18,17 @@ import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.util.bytes.BytesValue; -public class RoundChangeMessage extends AbstractIbftMessage { +public class RoundChangeMessageData extends AbstractIbftMessageData { private static final int MESSAGE_CODE = IbftV2.ROUND_CHANGE; - private RoundChangeMessage(final BytesValue data) { + private RoundChangeMessageData(final BytesValue data) { super(data); } - public static RoundChangeMessage fromMessage(final MessageData message) { - return fromMessage(message, MESSAGE_CODE, RoundChangeMessage.class, RoundChangeMessage::new); + public static RoundChangeMessageData fromMessageData(final MessageData messageData) { + return fromMessageData( + messageData, MESSAGE_CODE, RoundChangeMessageData.class, RoundChangeMessageData::new); } @Override @@ -35,9 +36,9 @@ public SignedData decode() { return SignedData.readSignedRoundChangePayloadFrom(RLP.input(data)); } - public static RoundChangeMessage create(final SignedData signedPayload) { + public static RoundChangeMessageData create(final SignedData signedPayload) { - return new RoundChangeMessage(signedPayload.encode()); + return new RoundChangeMessageData(signedPayload.encode()); } @Override diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftMulticaster.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftMulticaster.java index bdb8350a44..4e61274f86 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftMulticaster.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftMulticaster.java @@ -12,9 +12,15 @@ */ package tech.pegasys.pantheon.consensus.ibft.network; +import tech.pegasys.pantheon.ethereum.core.Address; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import java.util.Collection; + public interface IbftMulticaster { void multicastToValidators(final MessageData message); + + void multicastToValidatorsExcept( + final MessageData message, final Collection
exceptAddresses); } diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeers.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeers.java index 92f10b0129..4482e68d43 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeers.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeers.java @@ -13,17 +13,15 @@ package tech.pegasys.pantheon.consensus.ibft.network; import tech.pegasys.pantheon.consensus.common.ValidatorProvider; -import tech.pegasys.pantheon.crypto.SECP256K1.PublicKey; import tech.pegasys.pantheon.ethereum.core.Address; -import tech.pegasys.pantheon.ethereum.core.Util; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected; -import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.Collection; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -43,12 +41,12 @@ public IbftNetworkPeers(final ValidatorProvider validatorProvider) { } public void peerAdded(final PeerConnection newConnection) { - final Address peerAddress = getAddressFrom(newConnection); + final Address peerAddress = newConnection.getPeer().getAddress(); peerConnections.put(peerAddress, newConnection); } public void peerRemoved(final PeerConnection removedConnection) { - final Address peerAddress = getAddressFrom(removedConnection); + final Address peerAddress = removedConnection.getPeer().getAddress(); peerConnections.remove(peerAddress); } @@ -58,6 +56,18 @@ public void multicastToValidators(final MessageData message) { sendMessageToSpecificAddresses(validators, message); } + @Override + public void multicastToValidatorsExcept( + final MessageData message, final Collection
exceptAddresses) { + final Collection
includedValidators = + validatorProvider + .getValidators() + .stream() + .filter(a -> !exceptAddresses.contains(a)) + .collect(Collectors.toSet()); + sendMessageToSpecificAddresses(includedValidators, message); + } + private void sendMessageToSpecificAddresses( final Collection
recipients, final MessageData message) { recipients @@ -73,10 +83,4 @@ private void sendMessageToSpecificAddresses( } }); } - - private Address getAddressFrom(final PeerConnection connection) { - final BytesValue peerNodeId = connection.getPeer().getNodeId(); - final PublicKey remotePublicKey = PublicKey.create(peerNodeId); - return Util.publicKeyToAddress(remotePublicKey); - } } diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManager.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManager.java index 8a0343272a..aa9193c7a3 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManager.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManager.java @@ -146,51 +146,53 @@ public void roundExpired(final RoundExpiry expire) { // Its possible the locally created RoundChange triggers the transmission of a NewRound // message - so it must be handled accordingly. - handleRoundChangeMessage(localRoundChange); + handleRoundChangePayload(localRoundChange); } - public void handleProposalMessage(final SignedData msg) { - LOG.info("Received a Proposal message."); - actionOrBufferMessage(msg, currentRound::handleProposalMessage, RoundState::setProposedBlock); + public void handleProposalPayload(final SignedData signedPayload) { + LOG.info("Received a Proposal Payload."); + actionOrBufferMessage( + signedPayload, currentRound::handleProposalMessage, RoundState::setProposedBlock); } - public void handlePrepareMessage(final SignedData msg) { - LOG.info("Received a prepare message."); - actionOrBufferMessage(msg, currentRound::handlePrepareMessage, RoundState::addPrepareMessage); + public void handlePreparePayload(final SignedData signedPayload) { + LOG.info("Received a prepare Payload."); + actionOrBufferMessage( + signedPayload, currentRound::handlePrepareMessage, RoundState::addPrepareMessage); } - public void handleCommitMessage(final SignedData msg) { - LOG.info("Received a commit message."); - actionOrBufferMessage(msg, currentRound::handleCommitMessage, RoundState::addCommitMessage); + public void handleCommitPayload(final SignedData payload) { + LOG.info("Received a commit Payload."); + actionOrBufferMessage(payload, currentRound::handleCommitMessage, RoundState::addCommitMessage); } private void actionOrBufferMessage( - final SignedData msg, + final SignedData msgData, final Consumer> inRoundHandler, final BiConsumer> buffer) { - final Payload payload = msg.getPayload(); + final Payload payload = msgData.getPayload(); final MessageAge messageAge = determineAgeOfPayload(payload); if (messageAge == CURRENT_ROUND) { - inRoundHandler.accept(msg); + inRoundHandler.accept(msgData); } else if (messageAge == FUTURE_ROUND) { final ConsensusRoundIdentifier msgRoundId = payload.getRoundIdentifier(); final RoundState roundstate = futureRoundStateBuffer.computeIfAbsent( msgRoundId.getRoundNumber(), k -> roundStateCreator.apply(msgRoundId)); - buffer.accept(roundstate, msg); + buffer.accept(roundstate, msgData); } } - public void handleRoundChangeMessage(final SignedData msg) { + public void handleRoundChangePayload(final SignedData signedPayload) { final Optional result = - roundChangeManager.appendRoundChangeMessage(msg); - final MessageAge messageAge = determineAgeOfPayload(msg.getPayload()); + roundChangeManager.appendRoundChangeMessage(signedPayload); + final MessageAge messageAge = determineAgeOfPayload(signedPayload.getPayload()); if (messageAge == PRIOR_ROUND) { - LOG.info("Received RoundChange Message for a prior round."); + LOG.info("Received RoundChange Payload for a prior round."); return; } - ConsensusRoundIdentifier targetRound = msg.getPayload().getRoundIdentifier(); - LOG.info("Received a RoundChange message for {}", targetRound.toString()); + final ConsensusRoundIdentifier targetRound = signedPayload.getPayload().getRoundIdentifier(); + LOG.info("Received a RoundChange Payload for {}", targetRound.toString()); if (result.isPresent()) { if (messageAge == FUTURE_ROUND) { @@ -218,17 +220,17 @@ private void startNewRound(final int roundNumber) { roundTimer.startTimer(currentRound.getRoundIdentifier()); } - public void handleNewRoundMessage(final SignedData msg) { - final NewRoundPayload payload = msg.getPayload(); + public void handleNewRoundPayload(final SignedData signedPayload) { + final NewRoundPayload payload = signedPayload.getPayload(); final MessageAge messageAge = determineAgeOfPayload(payload); if (messageAge == PRIOR_ROUND) { - LOG.info("Received NewRound Message for a prior round."); + LOG.info("Received NewRound Payload for a prior round."); return; } - LOG.info("Received NewRound Message for {}", payload.getRoundIdentifier().toString()); + LOG.info("Received NewRound Payload for {}", payload.getRoundIdentifier().toString()); - if (newRoundMessageValidator.validateNewRoundMessage(msg)) { + if (newRoundMessageValidator.validateNewRoundMessage(signedPayload)) { if (messageAge == FUTURE_ROUND) { startNewRound(payload.getRoundIdentifier().getRoundNumber()); } @@ -241,8 +243,8 @@ public long getChainHeight() { } private MessageAge determineAgeOfPayload(final Payload payload) { - int messageRoundNumber = payload.getRoundIdentifier().getRoundNumber(); - int currentRoundNumber = currentRound.getRoundIdentifier().getRoundNumber(); + final int messageRoundNumber = payload.getRoundIdentifier().getRoundNumber(); + final int currentRoundNumber = currentRound.getRoundIdentifier().getRoundNumber(); if (messageRoundNumber > currentRoundNumber) { return FUTURE_ROUND; } else if (messageRoundNumber == currentRoundNumber) { diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java index 19983f7205..2f616a4059 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java @@ -15,29 +15,27 @@ import static java.util.Collections.emptyList; import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; +import tech.pegasys.pantheon.consensus.ibft.IbftGossip; import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry; import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent; import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead; import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessage; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData; import tech.pegasys.pantheon.consensus.ibft.ibftmessage.IbftV2; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload; -import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.Payload; -import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparePayload; -import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.ProposalPayload; -import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangePayload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData; import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.p2p.api.Message; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -50,26 +48,34 @@ public class IbftController { private final Blockchain blockchain; private final IbftFinalState ibftFinalState; private final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory; - private final Map> futureMessages; + private final Map> futureMessages; private IbftBlockHeightManager currentHeightManager; + private final IbftGossip gossiper; public IbftController( final Blockchain blockchain, final IbftFinalState ibftFinalState, final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory) { - this(blockchain, ibftFinalState, ibftBlockHeightManagerFactory, Maps.newHashMap()); + this( + blockchain, + ibftFinalState, + ibftBlockHeightManagerFactory, + Maps.newHashMap(), + new IbftGossip(ibftFinalState.getPeers())); } @VisibleForTesting - IbftController( + public IbftController( final Blockchain blockchain, final IbftFinalState ibftFinalState, final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory, - final Map> futureMessages) { + final Map> futureMessages, + final IbftGossip gossiper) { this.blockchain = blockchain; this.ibftFinalState = ibftFinalState; this.ibftBlockHeightManagerFactory = ibftBlockHeightManagerFactory; this.futureMessages = futureMessages; + this.gossiper = gossiper; } public void start() { @@ -77,48 +83,45 @@ public void start() { } public void handleMessageEvent(final IbftReceivedMessageEvent msg) { - handleMessage(msg.getMessageData()); + handleMessage(msg.getMessage()); } - private void handleMessage(final MessageData messageData) { + private void handleMessage(final Message message) { + final MessageData messageData = message.getData(); switch (messageData.getCode()) { case IbftV2.PROPOSAL: - final SignedData proposalMsg = - ProposalMessage.fromMessage(messageData).decode(); - if (processMessage(proposalMsg, messageData)) { - currentHeightManager.handleProposalMessage(proposalMsg); - } + consumeMessage( + message, + ProposalMessageData.fromMessageData(messageData).decode(), + currentHeightManager::handleProposalPayload); break; case IbftV2.PREPARE: - final SignedData prepareMsg = - PrepareMessage.fromMessage(messageData).decode(); - if (processMessage(prepareMsg, messageData)) { - currentHeightManager.handlePrepareMessage(prepareMsg); - } + consumeMessage( + message, + PrepareMessageData.fromMessageData(messageData).decode(), + currentHeightManager::handlePreparePayload); break; case IbftV2.COMMIT: - final SignedData commitMsg = CommitMessage.fromMessage(messageData).decode(); - if (processMessage(commitMsg, messageData)) { - currentHeightManager.handleCommitMessage(commitMsg); - } + consumeMessage( + message, + CommitMessageData.fromMessageData(messageData).decode(), + currentHeightManager::handleCommitPayload); break; case IbftV2.ROUND_CHANGE: - final SignedData roundChangeMsg = - RoundChangeMessage.fromMessage(messageData).decode(); - if (processMessage(roundChangeMsg, messageData)) { - currentHeightManager.handleRoundChangeMessage(roundChangeMsg); - } + consumeMessage( + message, + RoundChangeMessageData.fromMessageData(messageData).decode(), + currentHeightManager::handleRoundChangePayload); break; case IbftV2.NEW_ROUND: - final SignedData newRoundMsg = - NewRoundMessage.fromMessage(messageData).decode(); - if (processMessage(newRoundMsg, messageData)) { - currentHeightManager.handleNewRoundMessage(newRoundMsg); - } + consumeMessage( + message, + NewRoundMessageData.fromMessageData(messageData).decode(), + currentHeightManager::handleNewRoundPayload); break; default: @@ -127,6 +130,16 @@ private void handleMessage(final MessageData messageData) { } } + private

void consumeMessage( + final Message message, + final SignedData

signedPayload, + final Consumer> handleMessage) { + if (processMessage(signedPayload, message)) { + gossiper.gossipMessage(message); + handleMessage.accept(signedPayload); + } + } + public void handleNewBlockEvent(final NewChainHead newChainHead) { startNewHeightManager(newChainHead.getNewChainHeadHeader()); } @@ -151,20 +164,19 @@ private void startNewHeightManager(final BlockHeader parentHeader) { currentHeightManager = ibftBlockHeightManagerFactory.create(parentHeader); currentHeightManager.start(); final long newChainHeight = currentHeightManager.getChainHeight(); - List orDefault = futureMessages.getOrDefault(newChainHeight, emptyList()); + final List orDefault = futureMessages.getOrDefault(newChainHeight, emptyList()); orDefault.forEach(this::handleMessage); futureMessages.remove(newChainHeight); } - private boolean processMessage( - final SignedData msg, final MessageData rawMsg) { + private boolean processMessage(final SignedData msg, final Message rawMsg) { final ConsensusRoundIdentifier msgRoundIdentifier = msg.getPayload().getRoundIdentifier(); if (isMsgForCurrentHeight(msgRoundIdentifier)) { return isMsgFromKnownValidator(msg); } else if (isMsgForFutureChainHeight(msgRoundIdentifier)) { addMessageToFutureMessageBuffer(msgRoundIdentifier.getSequenceNumber(), rawMsg); } else { - LOG.info("IBFT message discarded as it is not for the current block height"); + LOG.info("IBFT message discarded as it is from a previous block height"); } return false; } @@ -181,7 +193,7 @@ private boolean isMsgForFutureChainHeight(final ConsensusRoundIdentifier roundId return roundIdentifier.getSequenceNumber() > currentHeightManager.getChainHeight(); } - private void addMessageToFutureMessageBuffer(final long chainHeight, final MessageData rawMsg) { + private void addMessageToFutureMessageBuffer(final long chainHeight, final Message rawMsg) { if (!futureMessages.containsKey(chainHeight)) { futureMessages.put(chainHeight, Lists.newArrayList()); } diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftMessageTransmitter.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftMessageTransmitter.java index 4649df1920..6548ead145 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftMessageTransmitter.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftMessageTransmitter.java @@ -13,11 +13,11 @@ package tech.pegasys.pantheon.consensus.ibft.statemachine; import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessage; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.MessageFactory; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload; @@ -49,7 +49,7 @@ public void multicastProposal(final ConsensusRoundIdentifier roundIdentifier, fi final SignedData signedPayload = messageFactory.createSignedProposalPayload(roundIdentifier, block); - final ProposalMessage message = ProposalMessage.create(signedPayload); + final ProposalMessageData message = ProposalMessageData.create(signedPayload); multicaster.multicastToValidators(message); } @@ -58,7 +58,7 @@ public void multicastPrepare(final ConsensusRoundIdentifier roundIdentifier, fin final SignedData signedPayload = messageFactory.createSignedPreparePayload(roundIdentifier, digest); - final PrepareMessage message = PrepareMessage.create(signedPayload); + final PrepareMessageData message = PrepareMessageData.create(signedPayload); multicaster.multicastToValidators(message); } @@ -70,7 +70,7 @@ public void multicastCommit( final SignedData signedPayload = messageFactory.createSignedCommitPayload(roundIdentifier, digest, commitSeal); - final CommitMessage message = CommitMessage.create(signedPayload); + final CommitMessageData message = CommitMessageData.create(signedPayload); multicaster.multicastToValidators(message); } @@ -82,7 +82,7 @@ public void multicastRoundChange( final SignedData signedPayload = messageFactory.createSignedRoundChangePayload(roundIdentifier, preparedCertificate); - final RoundChangeMessage message = RoundChangeMessage.create(signedPayload); + final RoundChangeMessageData message = RoundChangeMessageData.create(signedPayload); multicaster.multicastToValidators(message); } @@ -96,7 +96,7 @@ public void multicastNewRound( messageFactory.createSignedNewRoundPayload( roundIdentifier, roundChangeCertificate, proposalPayload); - final NewRoundMessage message = NewRoundMessage.create(signedPayload); + final NewRoundMessageData message = NewRoundMessageData.create(signedPayload); multicaster.multicastToValidators(message); } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftGossipTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftGossipTest.java new file mode 100644 index 0000000000..b1b366444c --- /dev/null +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftGossipTest.java @@ -0,0 +1,194 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft; + +import static com.google.common.collect.Lists.newArrayList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.Payload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.ProposalPayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData; +import tech.pegasys.pantheon.consensus.ibft.network.IbftNetworkPeers; +import tech.pegasys.pantheon.consensus.ibft.network.MockPeerFactory; +import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; +import tech.pegasys.pantheon.ethereum.core.Address; +import tech.pegasys.pantheon.ethereum.core.AddressHelpers; +import tech.pegasys.pantheon.ethereum.p2p.api.Message; +import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; +import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage; + +import java.util.function.Function; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class IbftGossipTest { + private IbftGossip ibftGossip; + @Mock private IbftNetworkPeers ibftNetworkPeers; + private PeerConnection peerConnection; + private static final Address senderAddress = AddressHelpers.ofValue(9); + + @Before + public void setup() { + ibftGossip = new IbftGossip(ibftNetworkPeers, 10); + peerConnection = MockPeerFactory.create(senderAddress); + } + + private

void assertRebroadcastToAllExceptSignerAndSender( + final Function> createPayload, + final Function, MessageData> createMessageData) { + final KeyPair keypair = KeyPair.generate(); + final SignedData

payload = createPayload.apply(keypair); + final MessageData messageData = createMessageData.apply(payload); + final Message message = new DefaultMessage(peerConnection, messageData); + + final boolean gossipResult = ibftGossip.gossipMessage(message); + assertThat(gossipResult).isTrue(); + verify(ibftNetworkPeers) + .multicastToValidatorsExcept(messageData, newArrayList(senderAddress, payload.getSender())); + } + + private

void assertRebroadcastOnlyOnce( + final Function> createPayload, + final Function, MessageData> createMessageData) { + final KeyPair keypair = KeyPair.generate(); + final SignedData

payload = createPayload.apply(keypair); + final MessageData messageData = createMessageData.apply(payload); + final Message message = new DefaultMessage(peerConnection, messageData); + + final boolean gossip1Result = ibftGossip.gossipMessage(message); + final boolean gossip2Result = ibftGossip.gossipMessage(message); + assertThat(gossip1Result).isTrue(); + assertThat(gossip2Result).isFalse(); + verify(ibftNetworkPeers, times(1)) + .multicastToValidatorsExcept(messageData, newArrayList(senderAddress, payload.getSender())); + } + + @Test + public void assertRebroadcastsProposalToAllExceptSignerAndSender() { + assertRebroadcastToAllExceptSignerAndSender( + TestHelpers::createSignedProposalPayload, ProposalMessageData::create); + } + + @Test + public void assertRebroadcastsProposalOnlyOnce() { + assertRebroadcastOnlyOnce( + TestHelpers::createSignedProposalPayload, ProposalMessageData::create); + } + + @Test + public void assertRebroadcastsPrepareToAllExceptSignerAndSender() { + assertRebroadcastToAllExceptSignerAndSender( + TestHelpers::createSignedPreparePayload, PrepareMessageData::create); + } + + @Test + public void assertRebroadcastsPrepareOnlyOnce() { + assertRebroadcastOnlyOnce(TestHelpers::createSignedPreparePayload, PrepareMessageData::create); + } + + @Test + public void assertRebroadcastsCommitToAllExceptSignerAndSender() { + assertRebroadcastToAllExceptSignerAndSender( + TestHelpers::createSignedCommitPayload, CommitMessageData::create); + } + + @Test + public void assertRebroadcastsCommitOnlyOnce() { + assertRebroadcastOnlyOnce(TestHelpers::createSignedCommitPayload, CommitMessageData::create); + } + + @Test + public void assertRebroadcastsRoundChangeToAllExceptSignerAndSender() { + assertRebroadcastToAllExceptSignerAndSender( + TestHelpers::createSignedRoundChangePayload, RoundChangeMessageData::create); + } + + @Test + public void assertRebroadcastsRoundChangeOnlyOnce() { + assertRebroadcastOnlyOnce( + TestHelpers::createSignedRoundChangePayload, RoundChangeMessageData::create); + } + + @Test + public void assertRebroadcastsNewRoundToAllExceptSignerAndSender() { + assertRebroadcastToAllExceptSignerAndSender( + TestHelpers::createSignedNewRoundPayload, NewRoundMessageData::create); + } + + @Test + public void assertRebroadcastsNewRoundOnlyOnce() { + assertRebroadcastOnlyOnce( + TestHelpers::createSignedNewRoundPayload, NewRoundMessageData::create); + } + + @Test + public void evictMessageRecordAtCapacity() { + final KeyPair keypair = KeyPair.generate(); + final SignedData payload = + TestHelpers.createSignedProposalPayloadWithRound(keypair, 0); + final MessageData messageData = ProposalMessageData.create(payload); + final Message message = new DefaultMessage(peerConnection, messageData); + final boolean gossip1Result = ibftGossip.gossipMessage(message); + final boolean gossip2Result = ibftGossip.gossipMessage(message); + assertThat(gossip1Result).isTrue(); + assertThat(gossip2Result).isFalse(); + verify(ibftNetworkPeers, times(1)) + .multicastToValidatorsExcept(messageData, newArrayList(senderAddress, payload.getSender())); + + for (int i = 1; i <= 9; i++) { + final SignedData nextPayload = + TestHelpers.createSignedProposalPayloadWithRound(keypair, i); + final MessageData nextMessageData = ProposalMessageData.create(nextPayload); + final Message nextMessage = new DefaultMessage(peerConnection, nextMessageData); + final boolean nextGossipResult = ibftGossip.gossipMessage(nextMessage); + assertThat(nextGossipResult).isTrue(); + } + + final boolean gossip3Result = ibftGossip.gossipMessage(message); + assertThat(gossip3Result).isFalse(); + verify(ibftNetworkPeers, times(1)) + .multicastToValidatorsExcept(messageData, newArrayList(senderAddress, payload.getSender())); + + { + final SignedData nextPayload = + TestHelpers.createSignedProposalPayloadWithRound(keypair, 10); + final MessageData nextMessageData = ProposalMessageData.create(nextPayload); + final Message nextMessage = new DefaultMessage(peerConnection, nextMessageData); + final boolean nextGossipResult = ibftGossip.gossipMessage(nextMessage); + assertThat(nextGossipResult).isTrue(); + } + + final boolean gossip4Result = ibftGossip.gossipMessage(message); + assertThat(gossip4Result).isTrue(); + verify(ibftNetworkPeers, times(2)) + .multicastToValidatorsExcept(messageData, newArrayList(senderAddress, payload.getSender())); + + final boolean gossip5Result = ibftGossip.gossipMessage(message); + assertThat(gossip5Result).isFalse(); + verify(ibftNetworkPeers, times(2)) + .multicastToValidatorsExcept(messageData, newArrayList(senderAddress, payload.getSender())); + } +} diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/TestHelpers.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/TestHelpers.java index fb208e7357..2104acec1d 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/TestHelpers.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/TestHelpers.java @@ -12,12 +12,28 @@ */ package tech.pegasys.pantheon.consensus.ibft; +import static com.google.common.collect.Lists.newArrayList; +import static java.util.Collections.singletonList; + +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.MessageFactory; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparePayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.ProposalPayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangeCertificate; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.RoundChangePayload; +import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData; +import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; +import tech.pegasys.pantheon.crypto.SECP256K1.Signature; import tech.pegasys.pantheon.ethereum.core.Address; +import tech.pegasys.pantheon.ethereum.core.AddressHelpers; import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator.BlockOptions; +import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.math.BigInteger; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -45,4 +61,53 @@ public static Block createProposalBlock(final List

validators, final in .setBlockHashFunction(IbftBlockHashing::calculateDataHashForCommittedSeal); return new BlockDataGenerator().block(blockOptions); } + + public static SignedData createSignedProposalPayload(final KeyPair signerKeys) { + return createSignedProposalPayloadWithRound(signerKeys, 0xFEDCBA98); + } + + public static SignedData createSignedProposalPayloadWithRound( + final KeyPair signerKeys, final int round) { + final MessageFactory messageFactory = new MessageFactory(signerKeys); + final ConsensusRoundIdentifier roundIdentifier = + new ConsensusRoundIdentifier(0x1234567890ABCDEFL, round); + final Block block = + TestHelpers.createProposalBlock(singletonList(AddressHelpers.ofValue(1)), 0); + return messageFactory.createSignedProposalPayload(roundIdentifier, block); + } + + public static SignedData createSignedPreparePayload(final KeyPair signerKeys) { + final MessageFactory messageFactory = new MessageFactory(signerKeys); + final ConsensusRoundIdentifier roundIdentifier = + new ConsensusRoundIdentifier(0x1234567890ABCDEFL, 0xFEDCBA98); + return messageFactory.createSignedPreparePayload( + roundIdentifier, Hash.fromHexStringLenient("0")); + } + + public static SignedData createSignedCommitPayload(final KeyPair signerKeys) { + final MessageFactory messageFactory = new MessageFactory(signerKeys); + final ConsensusRoundIdentifier roundIdentifier = + new ConsensusRoundIdentifier(0x1234567890ABCDEFL, 0xFEDCBA98); + return messageFactory.createSignedCommitPayload( + roundIdentifier, + Hash.fromHexStringLenient("0"), + Signature.create(BigInteger.ONE, BigInteger.TEN, (byte) 0)); + } + + public static SignedData createSignedRoundChangePayload( + final KeyPair signerKeys) { + final MessageFactory messageFactory = new MessageFactory(signerKeys); + final ConsensusRoundIdentifier roundIdentifier = + new ConsensusRoundIdentifier(0x1234567890ABCDEFL, 0xFEDCBA98); + return messageFactory.createSignedRoundChangePayload(roundIdentifier, Optional.empty()); + } + + public static SignedData createSignedNewRoundPayload(final KeyPair signerKeys) { + final MessageFactory messageFactory = new MessageFactory(signerKeys); + final ConsensusRoundIdentifier roundIdentifier = + new ConsensusRoundIdentifier(0x1234567890ABCDEFL, 0xFEDCBA98); + final SignedData proposalPayload = createSignedProposalPayload(signerKeys); + return messageFactory.createSignedNewRoundPayload( + roundIdentifier, new RoundChangeCertificate(newArrayList()), proposalPayload); + } } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/CommitMessageTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/CommitMessageTest.java index 01ef9a5bb6..7471b2d3d6 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/CommitMessageTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/CommitMessageTest.java @@ -32,12 +32,12 @@ public class CommitMessageTest { @Mock private SignedData commitPayload; @Mock private BytesValue messageBytes; @Mock private MessageData messageData; - @Mock private CommitMessage commitMessage; + @Mock private CommitMessageData commitMessage; @Test public void createMessageFromCommitMessageData() { when(commitPayload.encode()).thenReturn(messageBytes); - CommitMessage commitMessage = CommitMessage.create(commitPayload); + CommitMessageData commitMessage = CommitMessageData.create(commitPayload); assertThat(commitMessage.getData()).isEqualTo(messageBytes); assertThat(commitMessage.getCode()).isEqualTo(IbftV2.COMMIT); @@ -46,7 +46,7 @@ public void createMessageFromCommitMessageData() { @Test public void createMessageFromCommitMessage() { - CommitMessage message = CommitMessage.fromMessage(commitMessage); + CommitMessageData message = CommitMessageData.fromMessageData(commitMessage); assertThat(message).isSameAs(commitMessage); } @@ -54,7 +54,7 @@ public void createMessageFromCommitMessage() { public void createMessageFromGenericMessageData() { when(messageData.getData()).thenReturn(messageBytes); when(messageData.getCode()).thenReturn(IbftV2.COMMIT); - CommitMessage commitMessage = CommitMessage.fromMessage(messageData); + CommitMessageData commitMessage = CommitMessageData.fromMessageData(messageData); assertThat(commitMessage.getData()).isEqualTo(messageData.getData()); assertThat(commitMessage.getCode()).isEqualTo(IbftV2.COMMIT); @@ -63,8 +63,8 @@ public void createMessageFromGenericMessageData() { @Test public void createMessageFailsWhenIncorrectMessageCode() { when(messageData.getCode()).thenReturn(42); - assertThatThrownBy(() -> CommitMessage.fromMessage(messageData)) + assertThatThrownBy(() -> CommitMessageData.fromMessageData(messageData)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Message has code 42 and thus is not a CommitMessage"); + .hasMessageContaining("MessageData has code 42 and thus is not a CommitMessageData"); } } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/NewRoundMessageTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/NewRoundMessageTest.java index e43cfc3b14..fe1511ff5e 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/NewRoundMessageTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/NewRoundMessageTest.java @@ -32,12 +32,12 @@ public class NewRoundMessageTest { @Mock private SignedData newRoundPayload; @Mock private BytesValue messageBytes; @Mock private MessageData messageData; - @Mock private NewRoundMessage newRoundMessage; + @Mock private NewRoundMessageData newRoundMessage; @Test public void createMessageFromNewRoundChangeMessageData() { when(newRoundPayload.encode()).thenReturn(messageBytes); - NewRoundMessage prepareMessage = NewRoundMessage.create(newRoundPayload); + NewRoundMessageData prepareMessage = NewRoundMessageData.create(newRoundPayload); assertThat(prepareMessage.getData()).isEqualTo(messageBytes); assertThat(prepareMessage.getCode()).isEqualTo(IbftV2.NEW_ROUND); @@ -46,7 +46,7 @@ public void createMessageFromNewRoundChangeMessageData() { @Test public void createMessageFromNewRoundMessage() { - NewRoundMessage message = NewRoundMessage.fromMessage(newRoundMessage); + NewRoundMessageData message = NewRoundMessageData.fromMessageData(newRoundMessage); assertThat(message).isSameAs(newRoundMessage); } @@ -54,7 +54,7 @@ public void createMessageFromNewRoundMessage() { public void createMessageFromGenericMessageData() { when(messageData.getData()).thenReturn(messageBytes); when(messageData.getCode()).thenReturn(IbftV2.NEW_ROUND); - NewRoundMessage newRoundMessage = NewRoundMessage.fromMessage(messageData); + NewRoundMessageData newRoundMessage = NewRoundMessageData.fromMessageData(messageData); assertThat(newRoundMessage.getData()).isEqualTo(messageData.getData()); assertThat(newRoundMessage.getCode()).isEqualTo(IbftV2.NEW_ROUND); @@ -63,8 +63,8 @@ public void createMessageFromGenericMessageData() { @Test public void createMessageFailsWhenIncorrectMessageCode() { when(messageData.getCode()).thenReturn(42); - assertThatThrownBy(() -> NewRoundMessage.fromMessage(messageData)) + assertThatThrownBy(() -> NewRoundMessageData.fromMessageData(messageData)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Message has code 42 and thus is not a NewRoundMessage"); + .hasMessageContaining("MessageData has code 42 and thus is not a NewRoundMessageData"); } } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/PrepareMessageTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/PrepareMessageTest.java index 5b30449977..363c19d14e 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/PrepareMessageTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/PrepareMessageTest.java @@ -32,12 +32,12 @@ public class PrepareMessageTest { @Mock private SignedData preparePayload; @Mock private BytesValue messageBytes; @Mock private MessageData messageData; - @Mock private PrepareMessage prepareMessage; + @Mock private PrepareMessageData prepareMessage; @Test public void createMessageFromPrepareMessageData() { when(preparePayload.encode()).thenReturn(messageBytes); - PrepareMessage prepareMessage = PrepareMessage.create(preparePayload); + PrepareMessageData prepareMessage = PrepareMessageData.create(preparePayload); assertThat(prepareMessage.getData()).isEqualTo(messageBytes); assertThat(prepareMessage.getCode()).isEqualTo(IbftV2.PREPARE); @@ -46,7 +46,7 @@ public void createMessageFromPrepareMessageData() { @Test public void createMessageFromPrepareMessage() { - PrepareMessage message = PrepareMessage.fromMessage(prepareMessage); + PrepareMessageData message = PrepareMessageData.fromMessageData(prepareMessage); assertThat(message).isSameAs(prepareMessage); } @@ -54,7 +54,7 @@ public void createMessageFromPrepareMessage() { public void createMessageFromGenericMessageData() { when(messageData.getData()).thenReturn(messageBytes); when(messageData.getCode()).thenReturn(IbftV2.PREPARE); - PrepareMessage prepareMessage = PrepareMessage.fromMessage(messageData); + PrepareMessageData prepareMessage = PrepareMessageData.fromMessageData(messageData); assertThat(prepareMessage.getData()).isEqualTo(messageData.getData()); assertThat(prepareMessage.getCode()).isEqualTo(IbftV2.PREPARE); @@ -63,8 +63,8 @@ public void createMessageFromGenericMessageData() { @Test public void createMessageFailsWhenIncorrectMessageCode() { when(messageData.getCode()).thenReturn(42); - assertThatThrownBy(() -> PrepareMessage.fromMessage(messageData)) + assertThatThrownBy(() -> PrepareMessageData.fromMessageData(messageData)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Message has code 42 and thus is not a PrepareMessage"); + .hasMessageContaining("MessageData has code 42 and thus is not a PrepareMessageData"); } } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/PrePrepareMessageTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/ProposalMessageTest.java similarity index 75% rename from consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/PrePrepareMessageTest.java rename to consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/ProposalMessageTest.java index 44fdecf769..9480e2bfe7 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/PrePrepareMessageTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/ProposalMessageTest.java @@ -28,25 +28,25 @@ import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) -public class PrePrepareMessageTest { - @Mock private SignedData prePrepareMessageData; +public class ProposalMessageTest { + @Mock private SignedData proposalMessageData; @Mock private BytesValue messageBytes; @Mock private MessageData messageData; - @Mock private ProposalMessage proposalMessage; + @Mock private ProposalMessageData proposalMessage; @Test public void createMessageFromPrePrepareMessageData() { - when(prePrepareMessageData.encode()).thenReturn(messageBytes); - ProposalMessage proposalMessage = ProposalMessage.create(prePrepareMessageData); + when(proposalMessageData.encode()).thenReturn(messageBytes); + final ProposalMessageData proposalMessage = ProposalMessageData.create(proposalMessageData); assertThat(proposalMessage.getData()).isEqualTo(messageBytes); assertThat(proposalMessage.getCode()).isEqualTo(IbftV2.PROPOSAL); - verify(prePrepareMessageData).encode(); + verify(proposalMessageData).encode(); } @Test public void createMessageFromPrePrepareMessage() { - ProposalMessage message = ProposalMessage.fromMessage(proposalMessage); + final ProposalMessageData message = ProposalMessageData.fromMessageData(proposalMessage); assertThat(message).isSameAs(proposalMessage); } @@ -54,7 +54,7 @@ public void createMessageFromPrePrepareMessage() { public void createMessageFromGenericMessageData() { when(messageData.getCode()).thenReturn(IbftV2.PROPOSAL); when(messageData.getData()).thenReturn(messageBytes); - ProposalMessage proposalMessage = ProposalMessage.fromMessage(messageData); + final ProposalMessageData proposalMessage = ProposalMessageData.fromMessageData(messageData); assertThat(proposalMessage.getData()).isEqualTo(messageData.getData()); assertThat(proposalMessage.getCode()).isEqualTo(IbftV2.PROPOSAL); @@ -63,8 +63,8 @@ public void createMessageFromGenericMessageData() { @Test public void createMessageFailsWhenIncorrectMessageCode() { when(messageData.getCode()).thenReturn(42); - assertThatThrownBy(() -> ProposalMessage.fromMessage(messageData)) + assertThatThrownBy(() -> ProposalMessageData.fromMessageData(messageData)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Message has code 42 and thus is not a ProposalMessage"); + .hasMessageContaining("MessageData has code 42 and thus is not a ProposalMessageData"); } } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/RoundChangeMessageTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/RoundChangeMessageTest.java index 71bfd6fbcb..59686579bb 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/RoundChangeMessageTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/ibftmessage/RoundChangeMessageTest.java @@ -32,12 +32,12 @@ public class RoundChangeMessageTest { @Mock private SignedData roundChangePayload; @Mock private BytesValue messageBytes; @Mock private MessageData messageData; - @Mock private RoundChangeMessage roundChangeMessage; + @Mock private RoundChangeMessageData roundChangeMessage; @Test public void createMessageFromRoundChangeMessageData() { when(roundChangePayload.encode()).thenReturn(messageBytes); - RoundChangeMessage roundChangeMessage = RoundChangeMessage.create(roundChangePayload); + RoundChangeMessageData roundChangeMessage = RoundChangeMessageData.create(roundChangePayload); assertThat(roundChangeMessage.getData()).isEqualTo(messageBytes); assertThat(roundChangeMessage.getCode()).isEqualTo(IbftV2.ROUND_CHANGE); @@ -46,7 +46,7 @@ public void createMessageFromRoundChangeMessageData() { @Test public void createMessageFromRoundChangeMessage() { - RoundChangeMessage message = RoundChangeMessage.fromMessage(roundChangeMessage); + RoundChangeMessageData message = RoundChangeMessageData.fromMessageData(roundChangeMessage); assertThat(message).isSameAs(roundChangeMessage); } @@ -54,7 +54,7 @@ public void createMessageFromRoundChangeMessage() { public void createMessageFromGenericMessageData() { when(messageData.getData()).thenReturn(messageBytes); when(messageData.getCode()).thenReturn(IbftV2.ROUND_CHANGE); - RoundChangeMessage roundChangeMessage = RoundChangeMessage.fromMessage(messageData); + RoundChangeMessageData roundChangeMessage = RoundChangeMessageData.fromMessageData(messageData); assertThat(roundChangeMessage.getData()).isEqualTo(messageData.getData()); assertThat(roundChangeMessage.getCode()).isEqualTo(IbftV2.ROUND_CHANGE); @@ -63,8 +63,8 @@ public void createMessageFromGenericMessageData() { @Test public void createMessageFailsWhenIncorrectMessageCode() { when(messageData.getCode()).thenReturn(42); - assertThatThrownBy(() -> RoundChangeMessage.fromMessage(messageData)) + assertThatThrownBy(() -> RoundChangeMessageData.fromMessageData(messageData)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Message has code 42 and thus is not a RoundChangeMessage"); + .hasMessageContaining("MessageData has code 42 and thus is not a RoundChangeMessageData"); } } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java index 3609bdc829..f077ef790a 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/IbftNetworkPeersTest.java @@ -12,6 +12,7 @@ */ package tech.pegasys.pantheon.consensus.ibft.network; +import static com.google.common.collect.Lists.newArrayList; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -33,7 +34,6 @@ import java.math.BigInteger; import java.util.List; -import com.google.common.collect.Lists; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -42,21 +42,22 @@ @RunWith(MockitoJUnitRunner.class) public class IbftNetworkPeersTest { - private final List
validators = Lists.newArrayList(); - private final List publicKeys = Lists.newArrayList(); + private final List
validators = newArrayList(); + private final List publicKeys = newArrayList(); - private final List peerConnections = Lists.newArrayList(); + private final List peerConnections = newArrayList(); @Before public void setup() { for (int i = 0; i < 4; i++) { final PublicKey pubKey = PublicKey.create(BigInteger.valueOf(i)); publicKeys.add(pubKey); + final Address address = Util.publicKeyToAddress(pubKey); final PeerInfo peerInfo = mock(PeerInfo.class); final PeerConnection peerConnection = mock(PeerConnection.class); when(peerConnection.getPeer()).thenReturn(peerInfo); - when(peerInfo.getNodeId()).thenReturn(pubKey.getEncodedBytes()); + when(peerInfo.getAddress()).thenReturn(address); peerConnections.add(peerConnection); } @@ -93,7 +94,7 @@ public void doesntSendToValidatorsWhichAreNotDirectlyConnected() throws PeerNotC final IbftNetworkPeers peers = new IbftNetworkPeers(validatorProvider); // only add peer connections 1, 2 & 3, none of which should be invoked. - Lists.newArrayList(1, 2, 3).forEach(i -> peers.peerAdded(peerConnections.get(i))); + newArrayList(1, 2, 3).forEach(i -> peers.peerAdded(peerConnections.get(i))); final MessageData messageToSend = new RawMessage(1, BytesValue.EMPTY); peers.multicastToValidators(messageToSend); @@ -103,4 +104,27 @@ public void doesntSendToValidatorsWhichAreNotDirectlyConnected() throws PeerNotC verify(peerConnections.get(2), never()).sendForProtocol(any(), any()); verify(peerConnections.get(3), never()).sendForProtocol(any(), any()); } + + @Test + public void onlyValidatorsAreSentAMessageNotInExcludes() throws PeerNotConnected { + // Only add the first Peer's address to the validators. + final Address validatorAddress = Util.publicKeyToAddress(publicKeys.get(0)); + validators.add(validatorAddress); + validators.add(Util.publicKeyToAddress(publicKeys.get(1))); + final ValidatorProvider validatorProvider = mock(ValidatorProvider.class); + when(validatorProvider.getValidators()).thenReturn(validators); + + final IbftNetworkPeers peers = new IbftNetworkPeers(validatorProvider); + for (final PeerConnection peer : peerConnections) { + peers.peerAdded(peer); + } + + final MessageData messageToSend = new RawMessage(1, BytesValue.EMPTY); + peers.multicastToValidatorsExcept(messageToSend, newArrayList(validatorAddress)); + + verify(peerConnections.get(0), never()).sendForProtocol(any(), any()); + verify(peerConnections.get(1), times(1)).sendForProtocol(any(), any()); + verify(peerConnections.get(2), never()).sendForProtocol(any(), any()); + verify(peerConnections.get(3), never()).sendForProtocol(any(), any()); + } } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/MockPeerFactory.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/MockPeerFactory.java new file mode 100644 index 0000000000..150e47b5a1 --- /dev/null +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/network/MockPeerFactory.java @@ -0,0 +1,40 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft.network; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import tech.pegasys.pantheon.ethereum.core.Address; +import tech.pegasys.pantheon.ethereum.core.AddressHelpers; +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; +import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; + +public class MockPeerFactory { + public static PeerConnection create() { + return create(AddressHelpers.ofValue(9)); + } + + public static PeerConnection create(final Address address) { + final PeerConnection peerConnection = mock(PeerConnection.class); + final PeerInfo peerInfo = createPeerInfo(address); + when(peerConnection.getPeer()).thenReturn(peerInfo); + return peerConnection; + } + + public static PeerInfo createPeerInfo(final Address address) { + final PeerInfo peerInfo = mock(PeerInfo.class); + when(peerInfo.getAddress()).thenReturn(address); + return peerInfo; + } +} diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java index 2475399490..713b2a38d9 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftBlockHeightManagerTest.java @@ -233,7 +233,7 @@ public void onRoundChangeReceptionRoundChangeManagerIsInvokedAndNewRoundStarted( manager.start(); verify(roundFactory).createNewRound(any(), eq(0)); - manager.handleRoundChangeMessage(roundChangePayload); + manager.handleRoundChangePayload(roundChangePayload); verify(roundChangeManager, times(1)).appendRoundChangeMessage(roundChangePayload); verify(roundFactory, times(1)) @@ -279,7 +279,7 @@ public void whenSufficientRoundChangesAreReceivedANewRoundMessageIsTransmitted() messageValidatorFactory); manager.start(); - manager.handleRoundChangeMessage(roundChangePayload); + manager.handleRoundChangePayload(roundChangePayload); verify(messageTransmitter, times(1)) .multicastNewRound(eq(futureRoundIdentifier), eq(roundChangCert), any()); @@ -311,8 +311,8 @@ public void messagesForFutureRoundsAreBufferedAndUsedToPreloadNewRoundWhenItIsSt Hash.fromHexStringLenient("0"), Signature.create(BigInteger.ONE, BigInteger.ONE, (byte) 1)); - manager.handlePrepareMessage(preparePayload); - manager.handleCommitMessage(commitPayload); + manager.handlePreparePayload(preparePayload); + manager.handleCommitPayload(commitPayload); // Force a new round to be started at new round number. final SignedData newRound = @@ -321,7 +321,7 @@ public void messagesForFutureRoundsAreBufferedAndUsedToPreloadNewRoundWhenItIsSt new RoundChangeCertificate(Collections.emptyList()), messageFactory.createSignedProposalPayload(futureRoundIdentifier, createdBlock)); - manager.handleNewRoundMessage(newRound); + manager.handleNewRoundPayload(newRound); // Final state sets the Quorum Size to 3, so should send a Prepare and also a commit verify(messageTransmitter, times(1)).multicastPrepare(eq(futureRoundIdentifier), any()); @@ -349,8 +349,8 @@ public void preparedCertificateIncludedInRoundChangeMessageOnRoundTimeoutExpired validatorMessageFactory .get(1) .createSignedPreparePayload(roundIdentifier, Hash.fromHexStringLenient("0")); - manager.handlePrepareMessage(preparePayload); - manager.handlePrepareMessage(secondPreparePayload); + manager.handlePreparePayload(preparePayload); + manager.handlePreparePayload(secondPreparePayload); manager.roundExpired(new RoundExpiry(roundIdentifier)); diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java index 2ebe78f0cd..85ecdd4b08 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java @@ -12,7 +12,7 @@ */ package tech.pegasys.pantheon.consensus.ibft.statemachine; -import static org.assertj.core.api.Java6Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.util.Lists.newArrayList; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; @@ -22,16 +22,17 @@ import static org.mockito.Mockito.when; import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; +import tech.pegasys.pantheon.consensus.ibft.IbftGossip; import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry; import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent; import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead; import tech.pegasys.pantheon.consensus.ibft.ibftevent.RoundExpiry; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessage; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData; import tech.pegasys.pantheon.consensus.ibft.ibftmessage.IbftV2; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessage; -import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessage; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData; +import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload; import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.PreparePayload; @@ -41,7 +42,8 @@ import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.core.Address; import tech.pegasys.pantheon.ethereum.core.BlockHeader; -import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import tech.pegasys.pantheon.ethereum.p2p.api.Message; +import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage; import java.util.HashMap; import java.util.List; @@ -64,39 +66,46 @@ public class IbftControllerTest { @Mock private IbftBlockHeightManager blockHeightManager; @Mock private SignedData signedProposal; - @Mock private ProposalMessage proposalMessage; + private Message proposalMessage; + @Mock private ProposalMessageData proposalMessageData; @Mock private ProposalPayload proposalPayload; @Mock private SignedData signedPrepare; - @Mock private PrepareMessage prepareMessage; + private Message prepareMessage; + @Mock private PrepareMessageData prepareMessageData; @Mock private PreparePayload preparePayload; @Mock private SignedData signedCommit; - @Mock private CommitMessage commitMessage; + private Message commitMessage; + @Mock private CommitMessageData commitMessageData; @Mock private CommitPayload commitPayload; @Mock private SignedData signedNewRound; - @Mock private NewRoundMessage newRoundMessage; + private Message newRoundMessage; + @Mock private NewRoundMessageData newRoundMessageData; @Mock private NewRoundPayload newRoundPayload; @Mock private SignedData signedRoundChange; - @Mock private RoundChangeMessage roundChangeMessage; + private Message roundChangeMessage; + @Mock private RoundChangeMessageData roundChangeMessageData; @Mock private RoundChangePayload roundChangePayload; - private final Map> futureMessages = new HashMap<>(); + private final Map> futureMessages = new HashMap<>(); private final Address validator = Address.fromHexString("0x0"); private final Address unknownValidator = Address.fromHexString("0x2"); private final ConsensusRoundIdentifier futureRoundIdentifier = new ConsensusRoundIdentifier(2, 0); private ConsensusRoundIdentifier roundIdentifier = new ConsensusRoundIdentifier(0, 0); + @Mock private IbftGossip ibftGossip; private IbftController ibftController; @Before public void setup() { - ibftController = - new IbftController(blockChain, ibftFinalState, blockHeightManagerFactory, futureMessages); when(blockChain.getChainHeadHeader()).thenReturn(blockHeader); when(blockHeightManagerFactory.create(blockHeader)).thenReturn(blockHeightManager); when(ibftFinalState.getValidators()).thenReturn(ImmutableList.of(validator)); + ibftController = + new IbftController( + blockChain, ibftFinalState, blockHeightManagerFactory, futureMessages, ibftGossip); } @Test @@ -115,9 +124,9 @@ public void startsNewBlockHeightManagerAndReplaysFutureMessages() { setupRoundChange(futureRoundIdentifier, validator); setupNewRound(roundIdentifierHeight3, validator); - final List height2Msgs = + final List height2Msgs = newArrayList(prepareMessage, commitMessage, roundChangeMessage); - final List height3Msgs = newArrayList(proposalMessage, newRoundMessage); + final List height3Msgs = newArrayList(proposalMessage, newRoundMessage); futureMessages.put(2L, height2Msgs); futureMessages.put(3L, height3Msgs); when(blockHeightManager.getChainHeight()).thenReturn(2L); @@ -128,11 +137,14 @@ public void startsNewBlockHeightManagerAndReplaysFutureMessages() { verify(blockHeightManagerFactory).create(blockHeader); verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager).start(); - verify(blockHeightManager, never()).handleProposalMessage(signedProposal); - verify(blockHeightManager).handlePrepareMessage(signedPrepare); - verify(blockHeightManager).handleCommitMessage(signedCommit); - verify(blockHeightManager).handleRoundChangeMessage(signedRoundChange); - verify(blockHeightManager, never()).handleNewRoundMessage(signedNewRound); + verify(blockHeightManager, never()).handleProposalPayload(signedProposal); + verify(blockHeightManager).handlePreparePayload(signedPrepare); + verify(ibftGossip).gossipMessage(prepareMessage); + verify(blockHeightManager).handleCommitPayload(signedCommit); + verify(ibftGossip).gossipMessage(commitMessage); + verify(blockHeightManager).handleRoundChangePayload(signedRoundChange); + verify(ibftGossip).gossipMessage(roundChangeMessage); + verify(blockHeightManager, never()).handleNewRoundPayload(signedNewRound); } @Test @@ -155,11 +167,16 @@ public void createsNewBlockHeightManagerAndReplaysFutureMessagesOnNewChainHeadEv verify(blockHeightManagerFactory).create(blockHeader); verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager).start(); - verify(blockHeightManager).handleProposalMessage(signedProposal); - verify(blockHeightManager).handlePrepareMessage(signedPrepare); - verify(blockHeightManager).handleCommitMessage(signedCommit); - verify(blockHeightManager).handleRoundChangeMessage(signedRoundChange); - verify(blockHeightManager).handleNewRoundMessage(signedNewRound); + verify(blockHeightManager).handleProposalPayload(signedProposal); + verify(ibftGossip).gossipMessage(proposalMessage); + verify(blockHeightManager).handlePreparePayload(signedPrepare); + verify(ibftGossip).gossipMessage(prepareMessage); + verify(blockHeightManager).handleCommitPayload(signedCommit); + verify(ibftGossip).gossipMessage(commitMessage); + verify(blockHeightManager).handleRoundChangePayload(signedRoundChange); + verify(ibftGossip).gossipMessage(roundChangeMessage); + verify(blockHeightManager).handleNewRoundPayload(signedNewRound); + verify(ibftGossip).gossipMessage(newRoundMessage); } @Test @@ -189,7 +206,8 @@ public void proposalForCurrentHeightIsPassedToBlockHeightManager() { ibftController.handleMessageEvent(new IbftReceivedMessageEvent(proposalMessage)); assertThat(futureMessages).isEmpty(); - verify(blockHeightManager).handleProposalMessage(signedProposal); + verify(blockHeightManager).handleProposalPayload(signedProposal); + verify(ibftGossip).gossipMessage(proposalMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); @@ -202,7 +220,8 @@ public void prepareForCurrentHeightIsPassedToBlockHeightManager() { ibftController.handleMessageEvent(new IbftReceivedMessageEvent(prepareMessage)); assertThat(futureMessages).isEmpty(); - verify(blockHeightManager).handlePrepareMessage(signedPrepare); + verify(blockHeightManager).handlePreparePayload(signedPrepare); + verify(ibftGossip).gossipMessage(prepareMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); @@ -215,7 +234,8 @@ public void commitForCurrentHeightIsPassedToBlockHeightManager() { ibftController.handleMessageEvent(new IbftReceivedMessageEvent(commitMessage)); assertThat(futureMessages).isEmpty(); - verify(blockHeightManager).handleCommitMessage(signedCommit); + verify(blockHeightManager).handleCommitPayload(signedCommit); + verify(ibftGossip).gossipMessage(commitMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); @@ -229,7 +249,8 @@ public void newRoundForCurrentHeightIsPassedToBlockHeightManager() { ibftController.handleMessageEvent(new IbftReceivedMessageEvent(newRoundMessage)); assertThat(futureMessages).isEmpty(); - verify(blockHeightManager).handleNewRoundMessage(signedNewRound); + verify(blockHeightManager).handleNewRoundPayload(signedNewRound); + verify(ibftGossip).gossipMessage(newRoundMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); @@ -243,7 +264,8 @@ public void roundChangeForCurrentHeightIsPassedToBlockHeightManager() { ibftController.handleMessageEvent(new IbftReceivedMessageEvent(roundChangeMessage)); assertThat(futureMessages).isEmpty(); - verify(blockHeightManager).handleRoundChangeMessage(signedRoundChange); + verify(blockHeightManager).handleRoundChangePayload(signedRoundChange); + verify(ibftGossip).gossipMessage(roundChangeMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); @@ -337,7 +359,7 @@ public void roundChangeForUnknownValidatorIsDiscarded() { @Test public void proposalForFutureHeightIsBuffered() { setupProposal(futureRoundIdentifier, validator); - final Map> expectedFutureMsgs = + final Map> expectedFutureMsgs = ImmutableMap.of(2L, ImmutableList.of(proposalMessage)); verifyHasFutureMessages(new IbftReceivedMessageEvent(proposalMessage), expectedFutureMsgs); } @@ -345,7 +367,7 @@ public void proposalForFutureHeightIsBuffered() { @Test public void prepareForFutureHeightIsBuffered() { setupPrepare(futureRoundIdentifier, validator); - final Map> expectedFutureMsgs = + final Map> expectedFutureMsgs = ImmutableMap.of(2L, ImmutableList.of(prepareMessage)); verifyHasFutureMessages(new IbftReceivedMessageEvent(prepareMessage), expectedFutureMsgs); } @@ -353,7 +375,7 @@ public void prepareForFutureHeightIsBuffered() { @Test public void commitForFutureHeightIsBuffered() { setupCommit(futureRoundIdentifier, validator); - final Map> expectedFutureMsgs = + final Map> expectedFutureMsgs = ImmutableMap.of(2L, ImmutableList.of(commitMessage)); verifyHasFutureMessages(new IbftReceivedMessageEvent(commitMessage), expectedFutureMsgs); } @@ -361,7 +383,7 @@ public void commitForFutureHeightIsBuffered() { @Test public void newRoundForFutureHeightIsBuffered() { setupNewRound(futureRoundIdentifier, validator); - final Map> expectedFutureMsgs = + final Map> expectedFutureMsgs = ImmutableMap.of(2L, ImmutableList.of(newRoundMessage)); verifyHasFutureMessages(new IbftReceivedMessageEvent(newRoundMessage), expectedFutureMsgs); } @@ -369,7 +391,7 @@ public void newRoundForFutureHeightIsBuffered() { @Test public void roundChangeForFutureHeightIsBuffered() { setupRoundChange(futureRoundIdentifier, validator); - final Map> expectedFutureMsgs = + final Map> expectedFutureMsgs = ImmutableMap.of(2L, ImmutableList.of(roundChangeMessage)); verifyHasFutureMessages(new IbftReceivedMessageEvent(roundChangeMessage), expectedFutureMsgs); } @@ -385,7 +407,7 @@ private void verifyNotHandledAndNoFutureMsgs(final IbftReceivedMessageEvent msg) } private void verifyHasFutureMessages( - final IbftReceivedMessageEvent msg, final Map> expectedFutureMsgs) { + final IbftReceivedMessageEvent msg, final Map> expectedFutureMsgs) { ibftController.start(); ibftController.handleMessageEvent(msg); @@ -401,8 +423,9 @@ private void setupProposal( when(signedProposal.getPayload()).thenReturn(proposalPayload); when(signedProposal.getSender()).thenReturn(validator); when(proposalPayload.getRoundIdentifier()).thenReturn(roundIdentifier); - when(proposalMessage.getCode()).thenReturn(IbftV2.PROPOSAL); - when(proposalMessage.decode()).thenReturn(signedProposal); + when(proposalMessageData.getCode()).thenReturn(IbftV2.PROPOSAL); + when(proposalMessageData.decode()).thenReturn(signedProposal); + proposalMessage = new DefaultMessage(null, proposalMessageData); } private void setupPrepare( @@ -410,8 +433,9 @@ private void setupPrepare( when(signedPrepare.getPayload()).thenReturn(preparePayload); when(signedPrepare.getSender()).thenReturn(validator); when(preparePayload.getRoundIdentifier()).thenReturn(roundIdentifier); - when(prepareMessage.getCode()).thenReturn(IbftV2.PREPARE); - when(prepareMessage.decode()).thenReturn(signedPrepare); + when(prepareMessageData.getCode()).thenReturn(IbftV2.PREPARE); + when(prepareMessageData.decode()).thenReturn(signedPrepare); + prepareMessage = new DefaultMessage(null, prepareMessageData); } private void setupCommit( @@ -419,8 +443,9 @@ private void setupCommit( when(signedCommit.getPayload()).thenReturn(commitPayload); when(signedCommit.getSender()).thenReturn(validator); when(commitPayload.getRoundIdentifier()).thenReturn(roundIdentifier); - when(commitMessage.getCode()).thenReturn(IbftV2.COMMIT); - when(commitMessage.decode()).thenReturn(signedCommit); + when(commitMessageData.getCode()).thenReturn(IbftV2.COMMIT); + when(commitMessageData.decode()).thenReturn(signedCommit); + commitMessage = new DefaultMessage(null, commitMessageData); } private void setupNewRound( @@ -428,8 +453,9 @@ private void setupNewRound( when(signedNewRound.getPayload()).thenReturn(newRoundPayload); when(signedNewRound.getSender()).thenReturn(validator); when(newRoundPayload.getRoundIdentifier()).thenReturn(roundIdentifier); - when(newRoundMessage.getCode()).thenReturn(IbftV2.NEW_ROUND); - when(newRoundMessage.decode()).thenReturn(signedNewRound); + when(newRoundMessageData.getCode()).thenReturn(IbftV2.NEW_ROUND); + when(newRoundMessageData.decode()).thenReturn(signedNewRound); + newRoundMessage = new DefaultMessage(null, newRoundMessageData); } private void setupRoundChange( @@ -437,7 +463,8 @@ private void setupRoundChange( when(signedRoundChange.getPayload()).thenReturn(roundChangePayload); when(signedRoundChange.getSender()).thenReturn(validator); when(roundChangePayload.getRoundIdentifier()).thenReturn(roundIdentifier); - when(roundChangeMessage.getCode()).thenReturn(IbftV2.ROUND_CHANGE); - when(roundChangeMessage.decode()).thenReturn(signedRoundChange); + when(roundChangeMessageData.getCode()).thenReturn(IbftV2.ROUND_CHANGE); + when(roundChangeMessageData.decode()).thenReturn(signedRoundChange); + roundChangeMessage = new DefaultMessage(null, roundChangeMessageData); } } diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/AbstractMessageData.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/AbstractMessageData.java index 80da64bced..0ff7a35994 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/AbstractMessageData.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/AbstractMessageData.java @@ -15,6 +15,8 @@ import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.util.bytes.BytesValue; +import com.google.common.base.Objects; + public abstract class AbstractMessageData implements MessageData { protected final BytesValue data; @@ -32,4 +34,21 @@ public final int getSize() { public BytesValue getData() { return data; } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AbstractMessageData that = (AbstractMessageData) o; + return Objects.equal(data, that.data); + } + + @Override + public int hashCode() { + return Objects.hashCode(data); + } } diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/PeerInfo.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/PeerInfo.java index 0fbaac1802..f7bd0daf56 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/PeerInfo.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/PeerInfo.java @@ -14,6 +14,9 @@ import static tech.pegasys.pantheon.util.bytes.BytesValue.wrap; +import tech.pegasys.pantheon.crypto.SECP256K1.PublicKey; +import tech.pegasys.pantheon.ethereum.core.Address; +import tech.pegasys.pantheon.ethereum.core.Util; import tech.pegasys.pantheon.ethereum.rlp.RLPInput; import tech.pegasys.pantheon.ethereum.rlp.RLPOutput; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -82,6 +85,11 @@ public BytesValue getNodeId() { return nodeId; } + public Address getAddress() { + final PublicKey remotePublicKey = PublicKey.create(nodeId); + return Util.publicKeyToAddress(remotePublicKey); + } + public void writeTo(final RLPOutput out) { out.startList(); out.writeUnsignedByte(getVersion());