Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/netwok): check for duplicate message earlier #2435

Merged
merged 19 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ func (bm *BlockAnnounceMessage) Decode(in []byte) error {
}

// Hash returns the hash of the BlockAnnounceMessage
func (bm *BlockAnnounceMessage) Hash() common.Hash {
func (bm *BlockAnnounceMessage) Hash() (common.Hash, error) {
// scale encode each extrinsic
encMsg, _ := bm.Encode()
hash, _ := common.Blake2bHash(encMsg)
return hash
encMsg, err := bm.Encode()
if err != nil {
return common.Hash{}, err
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}

return common.Blake2bHash(encMsg)
}

// IsHandshake returns false
Expand Down Expand Up @@ -145,8 +148,8 @@ func (*BlockAnnounceHandshake) Type() byte {
}

// Hash ...
func (*BlockAnnounceHandshake) Hash() common.Hash {
return common.Hash{}
func (*BlockAnnounceHandshake) Hash() (common.Hash, error) {
return common.Hash{}, nil
}

// IsHandshake returns true
Expand Down Expand Up @@ -174,7 +177,7 @@ func (s *Service) validateBlockAnnounceHandshake(from peer.ID, hs Handshake) err
return errors.New("invalid handshake type")
}

if bhs.GenesisHash != s.blockState.GenesisHash() {
if !bhs.GenesisHash.Equal(s.blockState.GenesisHash()) {
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.GenesisMismatch,
Reason: peerset.GenesisMismatchReason,
Expand Down
15 changes: 10 additions & 5 deletions dot/network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package network

import (
"fmt"
"sync"

"github.com/ChainSafe/gossamer/internal/log"
Expand All @@ -24,13 +25,17 @@ func newGossip() *gossip {
}

// hasSeen broadcasts messages that have not been seen
func (g *gossip) hasSeen(msg NotificationsMessage) bool {
func (g *gossip) hasSeen(msg NotificationsMessage) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit named returns would be nice

Suggested change
func (g *gossip) hasSeen(msg NotificationsMessage) (bool, error) {
func (g *gossip) hasSeen(msg NotificationsMessage) (seen bool, err error) {

// check if message has not been seen
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
if seen, ok := g.seen.Load(msg.Hash()); !ok || !seen.(bool) {
msgHash, err := msg.Hash()
if err != nil {
return false, fmt.Errorf("could not hash notification message: %w", err)
}
if seen, ok := g.seen.Load(msgHash); !ok || !seen.(bool) {
// set message to has been seen
g.seen.Store(msg.Hash(), true)
return false
g.seen.Store(msgHash, true)
return false, nil
}

return true
return true, nil
}
9 changes: 6 additions & 3 deletions dot/network/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,26 @@ func TestGossip(t *testing.T) {

time.Sleep(TestMessageTimeout)

if hasSeenB, ok := nodeB.gossip.seen.Load(announceMessage.Hash()); !ok || hasSeenB.(bool) == false {
hash, err := announceMessage.Hash()
require.NoError(t, err)

if hasSeenB, ok := nodeB.gossip.seen.Load(hash); !ok || hasSeenB.(bool) == false {
t.Error(
"node B did not receive block request message from node A",
"\nreceived:", hasSeenB,
"\nexpected:", true,
)
}

if hasSeenC, ok := nodeC.gossip.seen.Load(announceMessage.Hash()); !ok || hasSeenC.(bool) == false {
if hasSeenC, ok := nodeC.gossip.seen.Load(hash); !ok || hasSeenC.(bool) == false {
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
t.Error(
"node C did not receive block request message from node B",
"\nreceived:", hasSeenC,
"\nexpected:", true,
)
}

if hasSeenA, ok := nodeA.gossip.seen.Load(announceMessage.Hash()); !ok || hasSeenA.(bool) == false {
if hasSeenA, ok := nodeA.gossip.seen.Load(hash); !ok || hasSeenA.(bool) == false {
t.Error(
"node A did not receive block request message from node C",
"\nreceived:", hasSeenA,
Expand Down
12 changes: 7 additions & 5 deletions dot/network/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Message interface {
type NotificationsMessage interface {
Message
Type() byte
Hash() common.Hash
Hash() (common.Hash, error)
IsHandshake() bool
}

Expand Down Expand Up @@ -389,11 +389,13 @@ func (cm *ConsensusMessage) Decode(in []byte) error {
}

// Hash returns the Hash of ConsensusMessage
func (cm *ConsensusMessage) Hash() common.Hash {
func (cm *ConsensusMessage) Hash() (common.Hash, error) {
// scale encode each extrinsic
encMsg, _ := cm.Encode()
hash, _ := common.Blake2bHash(encMsg)
return hash
encMsg, err := cm.Encode()
if err != nil {
return common.Hash{}, err
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}
return common.Blake2bHash(encMsg)
}

// IsHandshake returns false
Expand Down
8 changes: 7 additions & 1 deletion dot/network/message_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (m *messageCache) put(peer peer.ID, msg NotificationsMessage) (bool, error)
func (m *messageCache) exists(peer peer.ID, msg NotificationsMessage) bool {
key, err := generateCacheKey(peer, msg)
if err != nil {
logger.Errorf("could not generate cache key: %w", err)
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
return false
}

Expand All @@ -67,7 +68,12 @@ func generateCacheKey(peer peer.ID, msg NotificationsMessage) ([]byte, error) {
return nil, errors.New("cache does not support handshake messages")
}

peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msg.Hash().ToBytes()...))
msgHash, err := msg.Hash()
if err != nil {
return nil, err
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}

peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msgHash.ToBytes()...))
if err != nil {
return nil, err
}
Expand Down
38 changes: 25 additions & 13 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,20 @@ func (s *Service) createNotificationsMessageHandler(
return fmt.Errorf("%w: expected %T but got %T", errMessageTypeNotValid, (NotificationsMessage)(nil), msg)
}

hasSeen, err := s.gossip.hasSeen(msg)
if err != nil {
return fmt.Errorf("could not check if message was seen before: %w", err)
}

if hasSeen {
// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
return nil
}

if msg.IsHandshake() {
logger.Tracef("received handshake on notifications sub-protocol %s from peer %s, message is: %s",
info.protocolID, stream.Conn().RemotePeer(), msg)
Expand Down Expand Up @@ -207,16 +221,7 @@ func (s *Service) createNotificationsMessageHandler(
return nil
}

if !s.gossip.hasSeen(msg) {
s.broadcastExcluding(info, peer, msg)
return nil
}

// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
s.broadcastExcluding(info, peer, msg)
return nil
}
}
Expand All @@ -238,7 +243,14 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
return
}

if support, err := s.host.supportsProtocol(peer, info.protocolID); err != nil || !support {
support, err := s.host.supportsProtocol(peer, info.protocolID)
if err != nil {
logger.Criticalf("could not check if protocol %s is supported by peer %s: %s", info.protocolID, peer, err)
} else if !support {
logger.Criticalf("protocol %s is not supported by peer %s", info.protocolID, peer)
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}

if err != nil || !support {
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.BadProtocolValue,
Reason: peerset.BadProtocolReason,
Expand Down Expand Up @@ -319,7 +331,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP
peer, info.protocolID, hs)
stream, err := s.host.send(peer, info.protocolID, hs)
if err != nil {
logger.Tracef("failed to send message to peer %s: %s", peer, err)
logger.Tracef("failed to send handshake to peer %s: %s", peer, err)
// don't need to close the stream here, as it's nil!
return nil, err
}
Expand All @@ -345,7 +357,7 @@ func (s *Service) sendHandshake(peer peer.ID, hs Handshake, info *notificationsP
}

if hsResponse.err != nil {
logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, err)
logger.Tracef("failed to read handshake from peer %s using protocol %s: %s", peer, info.protocolID, hsResponse.err)
closeOutboundStream(info, peer, stream)
return nil, hsResponse.err
}
Expand Down
26 changes: 19 additions & 7 deletions dot/network/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ func (tm *TransactionMessage) Decode(in []byte) error {
}

// Hash returns the hash of the TransactionMessage
func (tm *TransactionMessage) Hash() common.Hash {
encMsg, _ := tm.Encode()
hash, _ := common.Blake2bHash(encMsg)
return hash
func (tm *TransactionMessage) Hash() (common.Hash, error) {
encMsg, err := tm.Encode()
if err != nil {
return common.Hash{}, nil
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}
return common.Blake2bHash(encMsg)
}

// IsHandshake returns false
Expand Down Expand Up @@ -93,8 +95,8 @@ func (*transactionHandshake) Type() byte {
}

// Hash ...
func (*transactionHandshake) Hash() common.Hash {
return common.Hash{}
func (*transactionHandshake) Hash() (common.Hash, error) {
return common.Hash{}, nil
}

// IsHandshake returns true
Expand Down Expand Up @@ -129,6 +131,7 @@ func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage, slotDur
case txnMsg := <-txnBatchCh:
propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg)
if err != nil {
logger.Warnf("could not handle transaction message: %s ", err)
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
s.host.closeProtocolStream(protocolID, txnMsg.peer)
continue
}
Expand All @@ -137,7 +140,16 @@ func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage, slotDur
continue
}

if !s.gossip.hasSeen(txnMsg.msg) {
// TODO: Check if s.gossip.hasSeen should be moved before handleTransactionMessage.
// That we could avoid handling the transactions again, which we would have already seen.
qdm12 marked this conversation as resolved.
Show resolved Hide resolved

hasSeen, err := s.gossip.hasSeen(txnMsg.msg)
if err != nil {
s.host.closeProtocolStream(protocolID, txnMsg.peer)
logger.Debugf("could not check if message was seen before: %s", err)
continue
}
if !hasSeen {
s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg)
}
}
Expand Down
4 changes: 4 additions & 0 deletions dot/peerset/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (h *Handler) RemovePeer(setID int, peers ...peer.ID) {

// ReportPeer reports ReputationChange according to the peer behaviour.
func (h *Handler) ReportPeer(rep ReputationChange, peers ...peer.ID) {
for _, pid := range peers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to check logging level? so we don't range across peers if package logging level isn't trace or debug.

Copy link
Contributor

@qdm12 qdm12 Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always ever pass a single peer for this method (slightly related comment), so it's fine really.

logger.Debugf("reporting reputation change of %d to peer %s, reason: %s", rep.Value, pid, rep.Reason)
}

kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
h.actionQueue <- action{
actionCall: reportPeer,
reputation: rep,
Expand Down
1 change: 1 addition & 0 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func headerKey(hash common.Hash) []byte {
}

// headerHashKey = headerHashPrefix + num (uint64 big endian)
// TODO: Shouldn't we change this block number to uint32?
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
func headerHashKey(number uint64) []byte {
return append(headerHashPrefix, encodeBlockNumber(number)...)
}
Expand Down
2 changes: 1 addition & 1 deletion dot/types/block_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewEmptyBlockData() *BlockData {

// Number returns the block header number.
func (bd *BlockData) Number() uint {
return bd.Header.Number
return uint(bd.Header.Number)
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}

func (bd *BlockData) String() string {
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
github.com/btcsuite/btcd v0.22.0-beta // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
Expand Down Expand Up @@ -130,7 +130,7 @@ require (
github.com/libp2p/go-yamux/v2 v2.2.0 // indirect
github.com/libp2p/zeroconf/v2 v2.1.0 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-colorable v0.1.9 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/miekg/dns v1.1.43 // indirect
Expand Down Expand Up @@ -159,9 +159,8 @@ require (
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/russross/blackfriday/v2 v2.0.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
github.com/stretchr/objx v0.1.1 // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
Expand All @@ -178,8 +177,9 @@ require (
go.uber.org/zap v1.19.0 // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.9 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.6 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
Expand Down
Loading