Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[NC-1909] IBFT message gossiping #501

Merged
merged 38 commits into from
Jan 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
79e7483
[NC-1909] renamed message types to message data to match their heirachy
Errorific Dec 20, 2018
4550cc0
[NC-1909] handle message and future buffer now deal with messages not…
Errorific Dec 20, 2018
c3718d2
[NC-1909] passing messages instead
Errorific Dec 20, 2018
d9df9df
[NC-1909] Gossiper in place
Errorific Dec 20, 2018
d83aeaa
[NC-1909] renamed variables to reflect them being payloads
Errorific Dec 20, 2018
a83ef6b
[NC-1909] round of spotless
Errorific Dec 20, 2018
45a230e
[NC-1909] tests back to where they were
Errorific Dec 20, 2018
078228f
[NC-1909] added gossiping of messages to the height manager
Errorific Dec 21, 2018
2ed58ba
[NC-1909] minor docs
Errorific Jan 2, 2019
d6b3c97
[NC-1909] wrong assertions import
Errorific Jan 2, 2019
e0f8ee2
[NC-1909] javadoc for IbftGossip
Errorific Jan 2, 2019
b7bf2bc
[NC-1909] first couple of tests of the gossiper
Errorific Jan 2, 2019
1f2b377
[NC-1909] coverage for ibft gossip
Errorific Jan 3, 2019
84d980d
[NC-1909] Test for new broadcast method
Errorific Jan 3, 2019
93b6270
[NC-1909] fixing errorprone checks
Errorific Jan 3, 2019
73fa0ad
[NC-1909] another final method param
Errorific Jan 3, 2019
9474970
[NC-1909] More final method params
Errorific Jan 3, 2019
f1a8d9f
[NC-1909] casting to silence mocking warnings
Errorific Jan 3, 2019
cb6e263
[NC-1909] Making sure that returned values are consumed
Errorific Jan 3, 2019
17e8a35
[NC-1909] fixing reworded error message tests
Errorific Jan 3, 2019
3981966
[NC-1909] fixes from PR comments
Errorific Jan 4, 2019
64ff2dd
[NC-1909] Added test for seen messages eviction
Errorific Jan 4, 2019
1286c8b
Merge branch 'master' into feature/NC-1909.ibft_gossip_2
Errorific Jan 4, 2019
5b4b4c7
[NC-1909] Spotless
Errorific Jan 4, 2019
9ff8f81
[NC-1909] deleted unused function after refactor
Errorific Jan 6, 2019
0085606
[NC-1909] variable that should have been final
Errorific Jan 6, 2019
810aeb5
[NC-1909] extracted controller case logic into common function
Errorific Jan 6, 2019
be718f1
[NC-1909] Fixed collection not being copied before filtering
Errorific Jan 6, 2019
591833d
[NC-1909] renamed variable at suggestion
Errorific Jan 6, 2019
64ae265
[NC-1909] method renames to reflect them operating on payloads not me…
Errorific Jan 6, 2019
352c011
Merge branch 'master' into feature/NC-1909.ibft_gossip_2
Errorific Jan 6, 2019
d9adf18
[NC-1909] fixed errorprone warning about return value being unchecked
Errorific Jan 6, 2019
3962583
[NC-1909] Fixed inverted filter
Errorific Jan 6, 2019
a6b2d47
Merge branch 'master' into feature/NC-1909.ibft_gossip_2
Errorific Jan 7, 2019
e819d1f
Merge branch 'master' into feature/NC-1909.ibft_gossip_2
Errorific Jan 7, 2019
d754dee
Merge branch 'master' into feature/NC-1909.ibft_gossip_2
Errorific Jan 9, 2019
12faec4
[NC-1909] supporting integration tests
Errorific Jan 9, 2019
4344338
[NC-1909] rewrote gossiper to take whole messages
Errorific Jan 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.IbftV2;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.Payload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
Expand Down Expand Up @@ -60,15 +60,15 @@ public static boolean msgMatchesExpected(

switch (expectedPayload.getMessageType()) {
case IbftV2.PROPOSAL:
return ProposalMessage.fromMessage(actual).decode().equals(expected);
return ProposalMessageData.fromMessageData(actual).decode().equals(expected);
case IbftV2.PREPARE:
return PrepareMessage.fromMessage(actual).decode().equals(expected);
return PrepareMessageData.fromMessageData(actual).decode().equals(expected);
case IbftV2.COMMIT:
return CommitMessage.fromMessage(actual).decode().equals(expected);
return CommitMessageData.fromMessageData(actual).decode().equals(expected);
case IbftV2.NEW_ROUND:
return NewRoundMessage.fromMessage(actual).decode().equals(expected);
return NewRoundMessageData.fromMessageData(actual).decode().equals(expected);
case IbftV2.ROUND_CHANGE:
return RoundChangeMessage.fromMessage(actual).decode().equals(expected);
return RoundChangeMessageData.fromMessageData(actual).decode().equals(expected);
default:
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.consensus.ibft.support;

import tech.pegasys.pantheon.consensus.ibft.network.IbftMulticaster;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;

import java.util.Collection;
Expand All @@ -32,6 +33,15 @@ public void addNetworkPeers(final Collection<ValidatorPeer> nodes) {

@Override
public void multicastToValidators(final MessageData message) {
validatorNodes.forEach(v -> v.handleReceivedMessage(message));
validatorNodes.forEach(peer -> peer.handleReceivedMessage(message));
}

@Override
public void multicastToValidatorsExcept(
final MessageData message, final Collection<Address> exceptAddresses) {
validatorNodes
.stream()
.filter(peer -> !exceptAddresses.contains(peer.getNodeAddress()))
.forEach(peer -> peer.handleReceivedMessage(message));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft.support;

import static java.util.Collections.emptyList;

import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.net.SocketAddress;
import java.util.Set;

public class StubbedPeerConnection implements PeerConnection {

@Override
public void send(final Capability capability, final MessageData message)
throws PeerNotConnected {}

@Override
public Set<Capability> getAgreedCapabilities() {
return null;
}

@Override
public PeerInfo getPeer() {
return new PeerInfo(0, "IbftIntTestPeer", emptyList(), 0, BytesValue.EMPTY);
}

@Override
public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) {}

@Override
public void disconnect(final DisconnectReason reason) {}

@Override
public SocketAddress getLocalAddress() {
return null;
}

@Override
public SocketAddress getRemoteAddress() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.consensus.ibft.support;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Mockito.mock;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive;

Expand All @@ -29,6 +30,7 @@
import tech.pegasys.pantheon.consensus.ibft.IbftContext;
import tech.pegasys.pantheon.consensus.ibft.IbftEventQueue;
import tech.pegasys.pantheon.consensus.ibft.IbftExtraData;
import tech.pegasys.pantheon.consensus.ibft.IbftGossip;
import tech.pegasys.pantheon.consensus.ibft.IbftHelpers;
import tech.pegasys.pantheon.consensus.ibft.IbftProtocolSchedule;
import tech.pegasys.pantheon.consensus.ibft.RoundTimer;
Expand Down Expand Up @@ -64,6 +66,7 @@
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -252,6 +255,9 @@ private static ControllerAndState createControllerAndFinalState(
final MessageValidatorFactory messageValidatorFactory =
new MessageValidatorFactory(proposerSelector, blockHeaderValidator, protocolContext);

// Disable Gossiping for integration tests.
final IbftGossip gossiper = mock(IbftGossip.class);

final IbftController ibftController =
new IbftController(
blockChain,
Expand All @@ -260,7 +266,9 @@ private static ControllerAndState createControllerAndFinalState(
finalState,
new IbftRoundFactory(finalState, protocolContext, protocolSchedule),
messageValidatorFactory,
protocolContext));
protocolContext),
new HashMap<>(),
gossiper);
//////////////////////////// END IBFT PantheonController ////////////////////////////

return new ControllerAndState(ibftController, finalState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftEvents;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessage;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.CommitPayload;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.MessageFactory;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.NewRoundPayload;
Expand All @@ -37,6 +37,7 @@
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage;

import java.util.Collections;
Expand All @@ -51,6 +52,7 @@ public class ValidatorPeer {
private final Address nodeAddress;
private final KeyPair nodeKeys;
private final MessageFactory messageFactory;
private final PeerConnection peerConnection = new StubbedPeerConnection();
private List<MessageData> receivedMessages = Lists.newArrayList();

private final IbftController localNodeController;
Expand All @@ -65,19 +67,24 @@ public ValidatorPeer(
this.localNodeController = localNodeController;
}

public Address getNodeAddress() {
return nodeAddress;
}

public SignedData<ProposalPayload> injectProposal(
final ConsensusRoundIdentifier rId, final Block block) {
final SignedData<ProposalPayload> payload =
messageFactory.createSignedProposalPayload(rId, block);
injectMessage(ProposalMessage.create(payload));

injectMessage(ProposalMessageData.create(payload));
return payload;
}

public SignedData<PreparePayload> injectPrepare(
final ConsensusRoundIdentifier rId, final Hash digest) {
final SignedData<PreparePayload> payload =
messageFactory.createSignedPreparePayload(rId, digest);
injectMessage(PrepareMessage.create(payload));
injectMessage(PrepareMessageData.create(payload));
return payload;
}

Expand All @@ -86,7 +93,7 @@ public SignedData<CommitPayload> injectCommit(
final Signature commitSeal = SECP256K1.sign(digest, nodeKeys);
final SignedData<CommitPayload> payload =
messageFactory.createSignedCommitPayload(rId, digest, commitSeal);
injectMessage(CommitMessage.create(payload));
injectMessage(CommitMessageData.create(payload));
return payload;
}

Expand All @@ -97,15 +104,15 @@ public SignedData<NewRoundPayload> injectNewRound(

final SignedData<NewRoundPayload> payload =
messageFactory.createSignedNewRoundPayload(rId, roundChangeCertificate, proposalPayload);
injectMessage(NewRoundMessage.create(payload));
injectMessage(NewRoundMessageData.create(payload));
return payload;
}

public SignedData<RoundChangePayload> injectRoundChange(
final ConsensusRoundIdentifier rId, final Optional<PreparedCertificate> preparedCertificate) {
final SignedData<RoundChangePayload> payload =
messageFactory.createSignedRoundChangePayload(rId, preparedCertificate);
injectMessage(RoundChangeMessage.create(payload));
injectMessage(RoundChangeMessageData.create(payload));
return payload;
}

Expand All @@ -122,7 +129,7 @@ public void clearReceivedMessages() {
}

public void injectMessage(final MessageData msgData) {
final DefaultMessage message = new DefaultMessage(null, msgData);
final DefaultMessage message = new DefaultMessage(peerConnection, msgData);
localNodeController.handleMessageEvent(
(IbftReceivedMessageEvent) IbftEvents.fromMessage(message));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
Errorific marked this conversation as resolved.
Show resolved Hide resolved
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;

import tech.pegasys.pantheon.consensus.ibft.ibftmessage.CommitMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.IbftV2;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.NewRoundMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.PrepareMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.ProposalMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessage.RoundChangeMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.SignedData;
import tech.pegasys.pantheon.consensus.ibft.network.IbftMulticaster;
import tech.pegasys.pantheon.crypto.SECP256K1.Signature;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.p2p.api.Message;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.Lists;

/** Class responsible for rebroadcasting IBFT messages to known validators */
public class IbftGossip {
private final IbftMulticaster peers;

// Size of the seenMessages cache, should end up utilising 65bytes * this number + some meta data
private final int maxSeenMessages;

// Set that starts evicting members when it hits capacity
private final Set<Signature> seenMessages =
Collections.newSetFromMap(
new LinkedHashMap<Signature, Boolean>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<Signature, Boolean> eldest) {
return size() > maxSeenMessages;
}
});

IbftGossip(final IbftMulticaster peers, final int maxSeenMessages) {
this.maxSeenMessages = maxSeenMessages;
this.peers = peers;
}

/**
* Constructor that attaches gossip logic to a set of peers
*
* @param peers The always up to date set of connected peers that understand IBFT
*/
public IbftGossip(final IbftMulticaster peers) {
this(peers, 10_000);
}

/**
* Retransmit a given IBFT message to other known validators nodes
*
* @param message The raw message to be gossiped
* @return Whether the message was rebroadcast or has been ignored as a repeat
*/
public boolean gossipMessage(final Message message) {
final MessageData messageData = message.getData();
final SignedData<?> signedData;
switch (messageData.getCode()) {
case IbftV2.PROPOSAL:
signedData = ProposalMessageData.fromMessageData(messageData).decode();
break;
case IbftV2.PREPARE:
signedData = PrepareMessageData.fromMessageData(messageData).decode();
break;
case IbftV2.COMMIT:
signedData = CommitMessageData.fromMessageData(messageData).decode();
break;
case IbftV2.ROUND_CHANGE:
signedData = RoundChangeMessageData.fromMessageData(messageData).decode();
break;
case IbftV2.NEW_ROUND:
signedData = NewRoundMessageData.fromMessageData(messageData).decode();
break;
default:
throw new IllegalArgumentException(
"Received message does not conform to any recognised IBFT message structure.");
}
final Signature signature = signedData.getSignature();
if (seenMessages.contains(signature)) {
return false;
} else {
final List<Address> excludeAddressesList =
Lists.newArrayList(
message.getConnection().getPeer().getAddress(), signedData.getSender());
peers.multicastToValidatorsExcept(messageData, excludeAddressesList);
seenMessages.add(signature);
return true;
}
}
}
Loading