diff --git a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java index 03f0e6535b..7880b44754 100644 --- a/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java +++ b/consensus/ibft/src/integration-test/java/tech/pegasys/pantheon/consensus/ibft/support/TestContextBuilder.java @@ -25,6 +25,7 @@ import tech.pegasys.pantheon.consensus.common.VoteTallyUpdater; import tech.pegasys.pantheon.consensus.ibft.BlockTimer; import tech.pegasys.pantheon.consensus.ibft.EventMultiplexer; +import tech.pegasys.pantheon.consensus.ibft.Gossiper; import tech.pegasys.pantheon.consensus.ibft.IbftBlockHashing; import tech.pegasys.pantheon.consensus.ibft.IbftBlockInterface; import tech.pegasys.pantheon.consensus.ibft.IbftContext; @@ -34,6 +35,7 @@ import tech.pegasys.pantheon.consensus.ibft.IbftHelpers; import tech.pegasys.pantheon.consensus.ibft.IbftProtocolSchedule; import tech.pegasys.pantheon.consensus.ibft.RoundTimer; +import tech.pegasys.pantheon.consensus.ibft.UniqueMessageMulticaster; import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreatorFactory; import tech.pegasys.pantheon.consensus.ibft.blockcreation.ProposerSelector; import tech.pegasys.pantheon.consensus.ibft.payload.MessageFactory; @@ -156,7 +158,9 @@ public TestContext build() { // Use a stubbed version of the multicaster, to prevent creating PeerConnections etc. final StubValidatorMulticaster multicaster = new StubValidatorMulticaster(); - final IbftGossip gossiper = useGossip ? new IbftGossip(multicaster) : mock(IbftGossip.class); + final UniqueMessageMulticaster uniqueMulticaster = new UniqueMessageMulticaster(multicaster); + + final Gossiper gossiper = useGossip ? new IbftGossip(uniqueMulticaster) : mock(Gossiper.class); final ControllerAndState controllerAndState = createControllerAndFinalState( @@ -219,11 +223,11 @@ private static Block createGenesisBlock(final Set
validators) { private static ControllerAndState createControllerAndFinalState( final MutableBlockchain blockChain, - final StubValidatorMulticaster stubbedMulticaster, + final StubValidatorMulticaster multicaster, final KeyPair nodeKeys, final Clock clock, final IbftEventQueue ibftEventQueue, - final IbftGossip gossiper) { + final Gossiper gossiper) { final WorldStateArchive worldStateArchive = createInMemoryWorldStateArchive(); @@ -272,7 +276,7 @@ private static ControllerAndState createControllerAndFinalState( nodeKeys, Util.publicKeyToAddress(nodeKeys.getPublicKey()), proposerSelector, - stubbedMulticaster, + multicaster, new RoundTimer( ibftEventQueue, ROUND_TIMER_SEC * 1000, Executors.newScheduledThreadPool(1)), new BlockTimer( @@ -298,8 +302,8 @@ private static ControllerAndState createControllerAndFinalState( new IbftRoundFactory( finalState, protocolContext, protocolSchedule, minedBlockObservers), messageValidatorFactory), - new HashMap<>(), - gossiper); + gossiper, + new HashMap<>()); final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController); //////////////////////////// END IBFT PantheonController //////////////////////////// diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/Gossiper.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/Gossiper.java new file mode 100644 index 0000000000..cbac83ac65 --- /dev/null +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/Gossiper.java @@ -0,0 +1,20 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft; + +import tech.pegasys.pantheon.ethereum.p2p.api.Message; + +public interface Gossiper { + + void send(Message message); +} diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftGossip.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftGossip.java index bccd9da18b..64cbbcb973 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftGossip.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftGossip.java @@ -20,40 +20,18 @@ import tech.pegasys.pantheon.consensus.ibft.messagedata.RoundChangeMessageData; import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster; import tech.pegasys.pantheon.consensus.ibft.payload.SignedData; -import tech.pegasys.pantheon.crypto.SECP256K1.Signature; import tech.pegasys.pantheon.ethereum.core.Address; import tech.pegasys.pantheon.ethereum.p2p.api.Message; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; -import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Set; import com.google.common.collect.Lists; /** Class responsible for rebroadcasting IBFT messages to known validators */ -public class IbftGossip { - private final ValidatorMulticaster multicaster; - - // Size of the seenMessages cache, should end up utilising 65bytes * this number + some meta data - private final int maxSeenMessages; - - // Set that starts evicting members when it hits capacity - private final Set seenMessages = - Collections.newSetFromMap( - new LinkedHashMap() { - @Override - protected boolean removeEldestEntry(final Map.Entry eldest) { - return size() > maxSeenMessages; - } - }); +public class IbftGossip implements Gossiper { - IbftGossip(final ValidatorMulticaster multicaster, final int maxSeenMessages) { - this.maxSeenMessages = maxSeenMessages; - this.multicaster = multicaster; - } + private final ValidatorMulticaster multicaster; /** * Constructor that attaches gossip logic to a set of multicaster @@ -61,16 +39,16 @@ protected boolean removeEldestEntry(final Map.Entry eldest) * @param multicaster Network connections to the remote validators */ public IbftGossip(final ValidatorMulticaster multicaster) { - this(multicaster, 10_000); + this.multicaster = multicaster; } /** * Retransmit a given IBFT message to other known validators nodes * * @param message The raw message to be gossiped - * @return Whether the message was rebroadcast or has been ignored as a repeat */ - public boolean gossipMessage(final Message message) { + @Override + public void send(final Message message) { final MessageData messageData = message.getData(); final SignedData signedData; switch (messageData.getCode()) { @@ -93,16 +71,9 @@ public boolean gossipMessage(final Message message) { throw new IllegalArgumentException( "Received message does not conform to any recognised IBFT message structure."); } - final Signature signature = signedData.getSignature(); - if (seenMessages.contains(signature)) { - return false; - } else { - final List
excludeAddressesList = - Lists.newArrayList( - message.getConnection().getPeer().getAddress(), signedData.getSender()); - multicaster.send(messageData, excludeAddressesList); - seenMessages.add(signature); - return true; - } + final List
excludeAddressesList = + Lists.newArrayList(message.getConnection().getPeer().getAddress(), signedData.getSender()); + + multicaster.send(messageData, excludeAddressesList); } } diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/UniqueMessageMulticaster.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/UniqueMessageMulticaster.java new file mode 100644 index 0000000000..dc6dc3bbae --- /dev/null +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/UniqueMessageMulticaster.java @@ -0,0 +1,68 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft; + +import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster; +import tech.pegasys.pantheon.ethereum.core.Address; +import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +public class UniqueMessageMulticaster implements ValidatorMulticaster { + + private final int maxSeenMessages; + private final ValidatorMulticaster multicaster; + + UniqueMessageMulticaster(final ValidatorMulticaster multicaster, final int maxSeenMessages) { + this.maxSeenMessages = maxSeenMessages; + this.multicaster = multicaster; + } + + /** + * Constructor that attaches gossip logic to a set of multicaster + * + * @param multicaster Network connections to the remote validators + */ + public UniqueMessageMulticaster(final ValidatorMulticaster multicaster) { + this(multicaster, 10_000); + } + + // Set that starts evicting members when it hits capacity + private final Set seenMessages = + Collections.newSetFromMap( + new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(final Map.Entry eldest) { + return size() > maxSeenMessages; + } + }); + + @Override + public void send(final MessageData message) { + send(message, Collections.emptyList()); + } + + @Override + public void send(final MessageData message, final Collection
blackList) { + final int uniqueID = message.hashCode(); + if (seenMessages.contains(uniqueID)) { + return; + } + multicaster.send(message, blackList); + seenMessages.add(uniqueID); + } +} diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java index 6c48df8fd1..c050ce272a 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftController.java @@ -15,6 +15,7 @@ import static java.util.Collections.emptyList; import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; +import tech.pegasys.pantheon.consensus.ibft.Gossiper; import tech.pegasys.pantheon.consensus.ibft.IbftGossip; import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry; import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent; @@ -51,18 +52,14 @@ public class IbftController { private final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory; private final Map> futureMessages; private BlockHeightManager currentHeightManager; - private final IbftGossip gossiper; + private final Gossiper gossiper; public IbftController( final Blockchain blockchain, final IbftFinalState ibftFinalState, - final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory) { - this( - blockchain, - ibftFinalState, - ibftBlockHeightManagerFactory, - Maps.newHashMap(), - new IbftGossip(ibftFinalState.getValidatorMulticaster())); + final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory, + final IbftGossip gossiper) { + this(blockchain, ibftFinalState, ibftBlockHeightManagerFactory, gossiper, Maps.newHashMap()); } @VisibleForTesting @@ -70,8 +67,8 @@ public IbftController( final Blockchain blockchain, final IbftFinalState ibftFinalState, final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory, - final Map> futureMessages, - final IbftGossip gossiper) { + final Gossiper gossiper, + final Map> futureMessages) { this.blockchain = blockchain; this.ibftFinalState = ibftFinalState; this.ibftBlockHeightManagerFactory = ibftBlockHeightManagerFactory; @@ -142,7 +139,7 @@ private

void consumeMessage( signedPayload.getPayload().getMessageType(), signedPayload); if (processMessage(signedPayload, message)) { - gossiper.gossipMessage(message); + gossiper.send(message); handleMessage.accept(signedPayload); } } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftGossipTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftGossipTest.java index e189b9e76c..a188e16726 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftGossipTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/IbftGossipTest.java @@ -13,19 +13,14 @@ package tech.pegasys.pantheon.consensus.ibft; import static com.google.common.collect.Lists.newArrayList; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import tech.pegasys.pantheon.consensus.ibft.messagedata.CommitMessageData; import tech.pegasys.pantheon.consensus.ibft.messagedata.NewRoundMessageData; -import tech.pegasys.pantheon.consensus.ibft.messagedata.PrepareMessageData; import tech.pegasys.pantheon.consensus.ibft.messagedata.ProposalMessageData; import tech.pegasys.pantheon.consensus.ibft.messagedata.RoundChangeMessageData; import tech.pegasys.pantheon.consensus.ibft.network.MockPeerFactory; import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster; import tech.pegasys.pantheon.consensus.ibft.payload.Payload; -import tech.pegasys.pantheon.consensus.ibft.payload.ProposalPayload; import tech.pegasys.pantheon.consensus.ibft.payload.SignedData; import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; import tech.pegasys.pantheon.ethereum.core.Address; @@ -52,7 +47,7 @@ public class IbftGossipTest { @Before public void setup() { - ibftGossip = new IbftGossip(validatorMulticaster, 10); + ibftGossip = new IbftGossip(validatorMulticaster); peerConnection = MockPeerFactory.create(senderAddress); } @@ -64,131 +59,26 @@ private

void assertRebroadcastToAllExceptSignerAndSender( final MessageData messageData = createMessageData.apply(payload); final Message message = new DefaultMessage(peerConnection, messageData); - final boolean gossipResult = ibftGossip.gossipMessage(message); - assertThat(gossipResult).isTrue(); + ibftGossip.send(message); verify(validatorMulticaster) .send(messageData, newArrayList(senderAddress, payload.getSender())); } - private

void assertRebroadcastOnlyOnce( - final Function> createPayload, - final Function, MessageData> createMessageData) { - final KeyPair keypair = KeyPair.generate(); - final SignedData

payload = createPayload.apply(keypair); - final MessageData messageData = createMessageData.apply(payload); - final Message message = new DefaultMessage(peerConnection, messageData); - - final boolean gossip1Result = ibftGossip.gossipMessage(message); - final boolean gossip2Result = ibftGossip.gossipMessage(message); - assertThat(gossip1Result).isTrue(); - assertThat(gossip2Result).isFalse(); - verify(validatorMulticaster, times(1)) - .send(messageData, newArrayList(senderAddress, payload.getSender())); - } - @Test public void assertRebroadcastsProposalToAllExceptSignerAndSender() { assertRebroadcastToAllExceptSignerAndSender( TestHelpers::createSignedProposalPayload, ProposalMessageData::create); } - @Test - public void assertRebroadcastsProposalOnlyOnce() { - assertRebroadcastOnlyOnce( - TestHelpers::createSignedProposalPayload, ProposalMessageData::create); - } - - @Test - public void assertRebroadcastsPrepareToAllExceptSignerAndSender() { - assertRebroadcastToAllExceptSignerAndSender( - TestHelpers::createSignedPreparePayload, PrepareMessageData::create); - } - - @Test - public void assertRebroadcastsPrepareOnlyOnce() { - assertRebroadcastOnlyOnce(TestHelpers::createSignedPreparePayload, PrepareMessageData::create); - } - - @Test - public void assertRebroadcastsCommitToAllExceptSignerAndSender() { - assertRebroadcastToAllExceptSignerAndSender( - TestHelpers::createSignedCommitPayload, CommitMessageData::create); - } - - @Test - public void assertRebroadcastsCommitOnlyOnce() { - assertRebroadcastOnlyOnce(TestHelpers::createSignedCommitPayload, CommitMessageData::create); - } - @Test public void assertRebroadcastsRoundChangeToAllExceptSignerAndSender() { assertRebroadcastToAllExceptSignerAndSender( TestHelpers::createSignedRoundChangePayload, RoundChangeMessageData::create); } - @Test - public void assertRebroadcastsRoundChangeOnlyOnce() { - assertRebroadcastOnlyOnce( - TestHelpers::createSignedRoundChangePayload, RoundChangeMessageData::create); - } - @Test public void assertRebroadcastsNewRoundToAllExceptSignerAndSender() { assertRebroadcastToAllExceptSignerAndSender( TestHelpers::createSignedNewRoundPayload, NewRoundMessageData::create); } - - @Test - public void assertRebroadcastsNewRoundOnlyOnce() { - assertRebroadcastOnlyOnce( - TestHelpers::createSignedNewRoundPayload, NewRoundMessageData::create); - } - - @Test - public void evictMessageRecordAtCapacity() { - final KeyPair keypair = KeyPair.generate(); - final SignedData payload = - TestHelpers.createSignedProposalPayloadWithRound(keypair, 0); - final MessageData messageData = ProposalMessageData.create(payload); - final Message message = new DefaultMessage(peerConnection, messageData); - final boolean gossip1Result = ibftGossip.gossipMessage(message); - final boolean gossip2Result = ibftGossip.gossipMessage(message); - assertThat(gossip1Result).isTrue(); - assertThat(gossip2Result).isFalse(); - verify(validatorMulticaster, times(1)) - .send(messageData, newArrayList(senderAddress, payload.getSender())); - - for (int i = 1; i <= 9; i++) { - final SignedData nextPayload = - TestHelpers.createSignedProposalPayloadWithRound(keypair, i); - final MessageData nextMessageData = ProposalMessageData.create(nextPayload); - final Message nextMessage = new DefaultMessage(peerConnection, nextMessageData); - final boolean nextGossipResult = ibftGossip.gossipMessage(nextMessage); - assertThat(nextGossipResult).isTrue(); - } - - final boolean gossip3Result = ibftGossip.gossipMessage(message); - assertThat(gossip3Result).isFalse(); - verify(validatorMulticaster, times(1)) - .send(messageData, newArrayList(senderAddress, payload.getSender())); - - { - final SignedData nextPayload = - TestHelpers.createSignedProposalPayloadWithRound(keypair, 10); - final MessageData nextMessageData = ProposalMessageData.create(nextPayload); - final Message nextMessage = new DefaultMessage(peerConnection, nextMessageData); - final boolean nextGossipResult = ibftGossip.gossipMessage(nextMessage); - assertThat(nextGossipResult).isTrue(); - } - - final boolean gossip4Result = ibftGossip.gossipMessage(message); - assertThat(gossip4Result).isTrue(); - verify(validatorMulticaster, times(2)) - .send(messageData, newArrayList(senderAddress, payload.getSender())); - - final boolean gossip5Result = ibftGossip.gossipMessage(message); - assertThat(gossip5Result).isFalse(); - verify(validatorMulticaster, times(2)) - .send(messageData, newArrayList(senderAddress, payload.getSender())); - } } diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/UniqueMessageMulticasterTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/UniqueMessageMulticasterTest.java new file mode 100644 index 0000000000..a832780423 --- /dev/null +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/UniqueMessageMulticasterTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft; + +import static java.util.Collections.emptyList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + +import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster; +import tech.pegasys.pantheon.ethereum.core.Address; +import tech.pegasys.pantheon.ethereum.core.AddressHelpers; +import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; +import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.List; + +import com.google.common.collect.Lists; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class UniqueMessageMulticasterTest { + + private final ValidatorMulticaster multicaster = mock(ValidatorMulticaster.class); + private final UniqueMessageMulticaster messageTracker = + new UniqueMessageMulticaster(multicaster, 5); + private final RawMessage messageSent = new RawMessage(5, BytesValue.wrap(new byte[5])); + + @Test + public void previouslySentMessageIsNotSentAgain() { + + messageTracker.send(messageSent); + verify(multicaster, times(1)).send(messageSent, emptyList()); + reset(multicaster); + + messageTracker.send(messageSent); + messageTracker.send(messageSent, emptyList()); + verifyZeroInteractions(multicaster); + } + + @Test + public void messagesSentWithABlackListAreNotRetransmitted() { + messageTracker.send(messageSent, emptyList()); + verify(multicaster, times(1)).send(messageSent, emptyList()); + reset(multicaster); + + messageTracker.send(messageSent, emptyList()); + messageTracker.send(messageSent); + verifyZeroInteractions(multicaster); + } + + @Test + public void oldMessagesAreEvictedWhenFullAndCanThenBeRetransmitted() { + final List messagesSent = Lists.newArrayList(); + + for (int i = 0; i < 6; i++) { + final RawMessage msg = new RawMessage(i, BytesValue.wrap(new byte[i])); + messagesSent.add(msg); + messageTracker.send(msg); + verify(multicaster, times(1)).send(msg, emptyList()); + } + reset(multicaster); + + messageTracker.send(messagesSent.get(5)); + verifyZeroInteractions(multicaster); + + messageTracker.send(messagesSent.get(0)); + verify(multicaster, times(1)).send(messagesSent.get(0), emptyList()); + } + + @Test + public void passedInBlackListIsPassedToUnderlyingValidator() { + List

blackList = + Lists.newArrayList(AddressHelpers.ofValue(0), AddressHelpers.ofValue(1)); + messageTracker.send(messageSent, blackList); + verify(multicaster, times(1)).send(messageSent, blackList); + } +} diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java index 9beaa38d9b..4c8a00a389 100644 --- a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/statemachine/IbftControllerTest.java @@ -108,7 +108,7 @@ public void setup() { when(ibftFinalState.getValidators()).thenReturn(ImmutableList.of(validator)); ibftController = new IbftController( - blockChain, ibftFinalState, blockHeightManagerFactory, futureMessages, ibftGossip); + blockChain, ibftFinalState, blockHeightManagerFactory, ibftGossip, futureMessages); when(chainHeadBlockHeader.getNumber()).thenReturn(1L); when(chainHeadBlockHeader.getHash()).thenReturn(Hash.ZERO); @@ -151,11 +151,11 @@ public void startsNewBlockHeightManagerAndReplaysFutureMessages() { verify(blockHeightManager).start(); verify(blockHeightManager, never()).handleProposalPayload(signedProposal); verify(blockHeightManager).handlePreparePayload(signedPrepare); - verify(ibftGossip).gossipMessage(prepareMessage); + verify(ibftGossip).send(prepareMessage); verify(blockHeightManager).handleCommitPayload(signedCommit); - verify(ibftGossip).gossipMessage(commitMessage); + verify(ibftGossip).send(commitMessage); verify(blockHeightManager).handleRoundChangePayload(signedRoundChange); - verify(ibftGossip).gossipMessage(roundChangeMessage); + verify(ibftGossip).send(roundChangeMessage); verify(blockHeightManager, never()).handleNewRoundPayload(signedNewRound); } @@ -181,15 +181,15 @@ public void createsNewBlockHeightManagerAndReplaysFutureMessagesOnNewChainHeadEv verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager, times(2)).start(); // once at beginning, and again on newChainHead. verify(blockHeightManager).handleProposalPayload(signedProposal); - verify(ibftGossip).gossipMessage(proposalMessage); + verify(ibftGossip).send(proposalMessage); verify(blockHeightManager).handlePreparePayload(signedPrepare); - verify(ibftGossip).gossipMessage(prepareMessage); + verify(ibftGossip).send(prepareMessage); verify(blockHeightManager).handleCommitPayload(signedCommit); - verify(ibftGossip).gossipMessage(commitMessage); + verify(ibftGossip).send(commitMessage); verify(blockHeightManager).handleRoundChangePayload(signedRoundChange); - verify(ibftGossip).gossipMessage(roundChangeMessage); + verify(ibftGossip).send(roundChangeMessage); verify(blockHeightManager).handleNewRoundPayload(signedNewRound); - verify(ibftGossip).gossipMessage(newRoundMessage); + verify(ibftGossip).send(newRoundMessage); } @Test @@ -239,7 +239,7 @@ public void proposalForCurrentHeightIsPassedToBlockHeightManager() { assertThat(futureMessages).isEmpty(); verify(blockHeightManager).handleProposalPayload(signedProposal); - verify(ibftGossip).gossipMessage(proposalMessage); + verify(ibftGossip).send(proposalMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); @@ -253,7 +253,7 @@ public void prepareForCurrentHeightIsPassedToBlockHeightManager() { assertThat(futureMessages).isEmpty(); verify(blockHeightManager).handlePreparePayload(signedPrepare); - verify(ibftGossip).gossipMessage(prepareMessage); + verify(ibftGossip).send(prepareMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); @@ -267,7 +267,7 @@ public void commitForCurrentHeightIsPassedToBlockHeightManager() { assertThat(futureMessages).isEmpty(); verify(blockHeightManager).handleCommitPayload(signedCommit); - verify(ibftGossip).gossipMessage(commitMessage); + verify(ibftGossip).send(commitMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); @@ -282,7 +282,7 @@ public void newRoundForCurrentHeightIsPassedToBlockHeightManager() { assertThat(futureMessages).isEmpty(); verify(blockHeightManager).handleNewRoundPayload(signedNewRound); - verify(ibftGossip).gossipMessage(newRoundMessage); + verify(ibftGossip).send(newRoundMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); @@ -297,7 +297,7 @@ public void roundChangeForCurrentHeightIsPassedToBlockHeightManager() { assertThat(futureMessages).isEmpty(); verify(blockHeightManager).handleRoundChangePayload(signedRoundChange); - verify(ibftGossip).gossipMessage(roundChangeMessage); + verify(ibftGossip).send(roundChangeMessage); verify(blockHeightManager, atLeastOnce()).getChainHeight(); verify(blockHeightManager).start(); verifyNoMoreInteractions(blockHeightManager); diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java index 3b5182c316..b9fd077e6a 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java @@ -26,9 +26,11 @@ import tech.pegasys.pantheon.consensus.ibft.IbftBlockInterface; import tech.pegasys.pantheon.consensus.ibft.IbftContext; import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue; +import tech.pegasys.pantheon.consensus.ibft.IbftGossip; import tech.pegasys.pantheon.consensus.ibft.IbftProcessor; import tech.pegasys.pantheon.consensus.ibft.IbftProtocolSchedule; import tech.pegasys.pantheon.consensus.ibft.RoundTimer; +import tech.pegasys.pantheon.consensus.ibft.UniqueMessageMulticaster; import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftBlockCreatorFactory; import tech.pegasys.pantheon.consensus.ibft.blockcreation.IbftMiningCoordinator; import tech.pegasys.pantheon.consensus.ibft.blockcreation.ProposerSelector; @@ -96,7 +98,6 @@ public class IbftPantheonController implements PantheonController { private final IbftProtocolManager ibftProtocolManager; private final KeyPair keyPair; private final TransactionPool transactionPool; - private final IbftProcessor ibftProcessor; private final Runnable closer; IbftPantheonController( @@ -108,7 +109,6 @@ public class IbftPantheonController implements PantheonController { final Synchronizer synchronizer, final KeyPair keyPair, final TransactionPool transactionPool, - final IbftProcessor ibftProcessor, final Runnable closer) { this.protocolSchedule = protocolSchedule; @@ -119,7 +119,6 @@ public class IbftPantheonController implements PantheonController { this.synchronizer = synchronizer; this.keyPair = keyPair; this.transactionPool = transactionPool; - this.ibftProcessor = ibftProcessor; this.closer = closer; } @@ -202,17 +201,20 @@ public static PantheonController init( new ProposerSelector(blockchain, voteTally, blockInterface, true); final ValidatorPeers peers = new ValidatorPeers(protocolContext.getConsensusState().getVoteTally()); + final UniqueMessageMulticaster uniqueMessageMulticaster = new UniqueMessageMulticaster(peers); final Subscribers minedBlockObservers = new Subscribers<>(); minedBlockObservers.subscribe(ethProtocolManager); + final IbftGossip gossiper = new IbftGossip(peers); + final IbftFinalState finalState = new IbftFinalState( voteTally, nodeKeys, Util.publicKeyToAddress(nodeKeys.getPublicKey()), proposerSelector, - peers, + uniqueMessageMulticaster, new RoundTimer( ibftEventQueue, ibftConfig.getRequestTimeoutSeconds(), @@ -237,7 +239,8 @@ public static PantheonController init( finalState, new IbftRoundFactory( finalState, protocolContext, protocolSchedule, minedBlockObservers), - messageValidatorFactory)); + messageValidatorFactory), + gossiper); ibftController.start(); final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController); @@ -275,7 +278,6 @@ public static PantheonController init( synchronizer, nodeKeys, transactionPool, - ibftProcessor, closer); }