Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/netwok): check for duplicate message earlier #2435

Merged
merged 19 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ func (bm *BlockAnnounceMessage) Decode(in []byte) error {
}

// Hash returns the hash of the BlockAnnounceMessage
func (bm *BlockAnnounceMessage) Hash() common.Hash {
func (bm *BlockAnnounceMessage) Hash() (common.Hash, error) {
// scale encode each extrinsic
encMsg, _ := bm.Encode()
hash, _ := common.Blake2bHash(encMsg)
return hash
encMsg, err := bm.Encode()
if err != nil {
return common.Hash{}, fmt.Errorf("cannot encode message: %w", err)
}

return common.Blake2bHash(encMsg)
}

// IsHandshake returns false
Expand Down Expand Up @@ -144,9 +147,15 @@ func (*BlockAnnounceHandshake) Type() byte {
return 0
}

// Hash ...
func (*BlockAnnounceHandshake) Hash() common.Hash {
return common.Hash{}
// Hash returns blake2b hash of block announce handshake.
func (hs *BlockAnnounceHandshake) Hash() (common.Hash, error) {
// scale encode each extrinsic
encMsg, err := hs.Encode()
if err != nil {
return common.Hash{}, fmt.Errorf("cannot encode handshake: %w", err)
}

return common.Blake2bHash(encMsg)
}

// IsHandshake returns true
Expand Down Expand Up @@ -174,7 +183,7 @@ func (s *Service) validateBlockAnnounceHandshake(from peer.ID, hs Handshake) err
return errors.New("invalid handshake type")
}

if bhs.GenesisHash != s.blockState.GenesisHash() {
if !bhs.GenesisHash.Equal(s.blockState.GenesisHash()) {
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.GenesisMismatch,
Reason: peerset.GenesisMismatchReason,
Expand Down
21 changes: 14 additions & 7 deletions dot/network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package network

import (
"fmt"
"sync"

"github.com/ChainSafe/gossamer/internal/log"
Expand All @@ -25,19 +26,25 @@ func newGossip() *gossip {
}
}

// hasSeen broadcasts messages that have not been seen
func (g *gossip) hasSeen(msg NotificationsMessage) bool {
// check if message has not been seen
msgHash := msg.Hash()
// hasSeen checks if we have seen the given message before.
func (g *gossip) hasSeen(msg NotificationsMessage) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit named returns would be nice

Suggested change
func (g *gossip) hasSeen(msg NotificationsMessage) (bool, error) {
func (g *gossip) hasSeen(msg NotificationsMessage) (seen bool, err error) {

msgHash, err := msg.Hash()
if err != nil {
return false, fmt.Errorf("could not hash notification message: %w", err)
}

g.seenMutex.Lock()
defer g.seenMutex.Unlock()

// check if message has not been seen
_, ok := g.seenMap[msgHash]
if !ok {
// set message to has been seen
g.seenMap[msgHash] = struct{}{}
return false
if !msg.IsHandshake() {
g.seenMap[msgHash] = struct{}{}
}
return false, nil
}

return true
return true, nil
}
9 changes: 6 additions & 3 deletions dot/network/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,15 @@ func TestGossip(t *testing.T) {

time.Sleep(TestMessageTimeout)

_, ok := nodeB.gossip.seenMap[announceMessage.Hash()]
hash, err := announceMessage.Hash()
require.NoError(t, err)

_, ok := nodeB.gossip.seenMap[hash]
require.True(t, ok, "node B did not receive block request message from node A")

_, ok = nodeC.gossip.seenMap[announceMessage.Hash()]
_, ok = nodeC.gossip.seenMap[hash]
require.True(t, ok, "node C did not receive block request message from node B")

_, ok = nodeA.gossip.seenMap[announceMessage.Hash()]
_, ok = nodeA.gossip.seenMap[hash]
require.True(t, ok, "node A did not receive block request message from node C")
}
12 changes: 7 additions & 5 deletions dot/network/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Message interface {
type NotificationsMessage interface {
Message
Type() byte
Hash() common.Hash
Hash() (common.Hash, error)
IsHandshake() bool
}

Expand Down Expand Up @@ -389,11 +389,13 @@ func (cm *ConsensusMessage) Decode(in []byte) error {
}

// Hash returns the Hash of ConsensusMessage
func (cm *ConsensusMessage) Hash() common.Hash {
func (cm *ConsensusMessage) Hash() (common.Hash, error) {
// scale encode each extrinsic
encMsg, _ := cm.Encode()
hash, _ := common.Blake2bHash(encMsg)
return hash
encMsg, err := cm.Encode()
if err != nil {
return common.Hash{}, fmt.Errorf("cannot encode message: %w", err)
}
return common.Blake2bHash(encMsg)
}

// IsHandshake returns false
Expand Down
9 changes: 8 additions & 1 deletion dot/network/message_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package network

import (
"errors"
"fmt"
"time"

"github.com/ChainSafe/gossamer/lib/common"
Expand Down Expand Up @@ -55,6 +56,7 @@ func (m *messageCache) put(peer peer.ID, msg NotificationsMessage) (bool, error)
func (m *messageCache) exists(peer peer.ID, msg NotificationsMessage) bool {
key, err := generateCacheKey(peer, msg)
if err != nil {
logger.Errorf("could not generate cache key: %s", err)
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
return false
}

Expand All @@ -67,7 +69,12 @@ func generateCacheKey(peer peer.ID, msg NotificationsMessage) ([]byte, error) {
return nil, errors.New("cache does not support handshake messages")
}

peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msg.Hash().ToBytes()...))
msgHash, err := msg.Hash()
if err != nil {
return nil, fmt.Errorf("cannot hash notification message: %w", err)
}

peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msgHash.ToBytes()...))
if err != nil {
return nil, err
}
Expand Down
37 changes: 24 additions & 13 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,20 @@ func (s *Service) createNotificationsMessageHandler(
return fmt.Errorf("%w: expected %T but got %T", errMessageTypeNotValid, (NotificationsMessage)(nil), msg)
}

hasSeen, err := s.gossip.hasSeen(msg)
if err != nil {
return fmt.Errorf("could not check if message was seen before: %w", err)
}

if hasSeen {
// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
return nil
}

if msg.IsHandshake() {
logger.Tracef("received handshake on notifications sub-protocol %s from peer %s, message is: %s",
info.protocolID, stream.Conn().RemotePeer(), msg)
Expand Down Expand Up @@ -207,16 +221,7 @@ func (s *Service) createNotificationsMessageHandler(
return nil
}

if !s.gossip.hasSeen(msg) {
s.broadcastExcluding(info, peer, msg)
return nil
}

// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
s.broadcastExcluding(info, peer, msg)
return nil
}
}
Expand All @@ -238,7 +243,13 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
return
}

if support, err := s.host.supportsProtocol(peer, info.protocolID); err != nil || !support {
support, err := s.host.supportsProtocol(peer, info.protocolID)
if err != nil {
logger.Errorf("could not check if protocol %s is supported by peer %s: %s", info.protocolID, peer, err)
return
}

if !support {
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.BadProtocolValue,
Reason: peerset.BadProtocolReason,
Expand Down Expand Up @@ -319,7 +330,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP
peer, info.protocolID, hs)
stream, err := s.host.send(peer, info.protocolID, hs)
if err != nil {
logger.Tracef("failed to send message to peer %s: %s", peer, err)
logger.Tracef("failed to send handshake to peer %s: %s", peer, err)
// don't need to close the stream here, as it's nil!
return nil, err
}
Expand All @@ -345,7 +356,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP
}

if hsResponse.err != nil {
logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, err)
logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, hsResponse.err)
closeOutboundStream(info, peer, stream)
return nil, hsResponse.err
}
Expand Down
16 changes: 9 additions & 7 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T)
Roles: 4,
BestBlockNumber: 77,
BestBlockHash: common.Hash{1},
GenesisHash: common.Hash{2},
// we are using a different genesis here, thus this
// handshake would be validated to be incorrect.
GenesisHash: common.Hash{2},
}

err = handler(stream, testHandshake)
Expand Down Expand Up @@ -367,43 +369,43 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
require.Len(t, txnBatch, 1)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 2)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 3)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 4)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 5)

// reached batch size limit, below transaction will not be included in batch.
msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}, {7, 7}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 5)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}, {7, 7}, {8, 8}},
}
// wait for transaction batch channel to process.
time.Sleep(1300 * time.Millisecond)
Expand Down
26 changes: 19 additions & 7 deletions dot/network/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ func (tm *TransactionMessage) Decode(in []byte) error {
}

// Hash returns the hash of the TransactionMessage
func (tm *TransactionMessage) Hash() common.Hash {
encMsg, _ := tm.Encode()
hash, _ := common.Blake2bHash(encMsg)
return hash
func (tm *TransactionMessage) Hash() (common.Hash, error) {
encMsg, err := tm.Encode()
if err != nil {
return common.Hash{}, fmt.Errorf("could not encode message: %w", err)
}
return common.Blake2bHash(encMsg)
}

// IsHandshake returns false
Expand Down Expand Up @@ -93,8 +95,8 @@ func (*transactionHandshake) Type() byte {
}

// Hash ...
func (*transactionHandshake) Hash() common.Hash {
return common.Hash{}
func (*transactionHandshake) Hash() (common.Hash, error) {
return common.Hash{}, nil
}

// IsHandshake returns true
Expand Down Expand Up @@ -129,6 +131,7 @@ func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage, slotDur
case txnMsg := <-txnBatchCh:
propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg)
if err != nil {
logger.Warnf("could not handle transaction message: %s", err)
s.host.closeProtocolStream(protocolID, txnMsg.peer)
continue
}
Expand All @@ -137,7 +140,16 @@ func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage, slotDur
continue
}

if !s.gossip.hasSeen(txnMsg.msg) {
// TODO: Check if s.gossip.hasSeen should be moved before handleTransactionMessage. #2445
// That we could avoid handling the transactions again, which we would have already seen.

hasSeen, err := s.gossip.hasSeen(txnMsg.msg)
if err != nil {
s.host.closeProtocolStream(protocolID, txnMsg.peer)
logger.Debugf("could not check if message was seen before: %s", err)
continue
}
if !hasSeen {
s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg)
}
}
Expand Down
4 changes: 4 additions & 0 deletions dot/peerset/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (h *Handler) RemovePeer(setID int, peers ...peer.ID) {

// ReportPeer reports ReputationChange according to the peer behaviour.
func (h *Handler) ReportPeer(rep ReputationChange, peers ...peer.ID) {
for _, pid := range peers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to check logging level? so we don't range across peers if package logging level isn't trace or debug.

Copy link
Contributor

@qdm12 qdm12 Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always ever pass a single peer for this method (slightly related comment), so it's fine really.

logger.Debugf("reporting reputation change of %d to peer %s, reason: %s", rep.Value, pid, rep.Reason)
}

kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
h.actionQueue <- action{
actionCall: reportPeer,
reputation: rep,
Expand Down
4 changes: 2 additions & 2 deletions lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func (*GrandpaHandshake) Type() byte {
}

// Hash ...
func (*GrandpaHandshake) Hash() common.Hash {
return common.Hash{}
func (*GrandpaHandshake) Hash() (common.Hash, error) {
return common.Hash{}, nil
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}

// IsHandshake returns true
Expand Down