Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task(lib/grandpa): ensure messages are stored and re-processed when needed #2107

Merged
merged 20 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,21 +288,9 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
return
}

if s.host.messageCache != nil {
added, err := s.host.messageCache.put(peer, msg)
if err != nil {
logger.Errorf("failed to add message to cache for peer %s: %s", peer, err)
return
}

// TODO: ensure grandpa stores *all* previously received votes and discards them
// only when they are for already finalised rounds; currently this causes issues
// because a vote might be received slightly too early, causing a round mismatch err,
// causing grandpa to discard the vote. (#1855)
_, isConsensusMsg := msg.(*ConsensusMessage)
if !added && !isConsensusMsg {
return
}
if (s.host.messageCache != nil) && (s.host.messageCache.exists(peer, msg)) {
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
// message has already been sent
return
}

// we've completed the handshake with the peer, send message directly
Expand All @@ -315,6 +303,11 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
closeOutboundStream(info, peer, stream)
}
return
} else if s.host.messageCache != nil {
if _, err := s.host.messageCache.put(peer, msg); err != nil {
logger.Errorf("failed to add message to cache for peer %s: %w", peer, err)
return
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}
}

logger.Tracef("successfully sent message on protocol %s to peer %s: message=", info.protocolID, peer, msg)
Expand Down
7 changes: 6 additions & 1 deletion lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (s *Service) playGrandpaRound() error {
}

logger.Debug("receiving pre-vote messages...")
go s.receiveMessages(ctx)
go s.receiveVoteMessages(ctx)
time.Sleep(s.interval)

if s.paused.Load().(bool) {
Expand All @@ -507,6 +507,9 @@ func (s *Service) playGrandpaRound() error {

logger.Debugf("sending pre-vote message %s...", pv)
roundComplete := make(chan struct{})
// roundComplete is a signal channel which is closed when the round completes
// (will receive the default value of channel's type), so we don't need to
// explicitly send a value.
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
defer close(roundComplete)

// continue to send prevote messages until round is done
Expand Down Expand Up @@ -550,6 +553,8 @@ func (s *Service) sendVoteMessage(stage Subround, msg *VoteMessage, roundComplet
ticker := time.NewTicker(s.interval * 4)
defer ticker.Stop()

// Though, this looks like we are sending messages multiple times,
// caching would make sure that they are being sent only once.
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
for {
if s.paused.Load().(bool) {
return
Expand Down
11 changes: 10 additions & 1 deletion lib/grandpa/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,20 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.
case *CommitMessage:
return nil, h.handleCommitMessage(msg)
case *NeighbourMessage:
// we can afford to not retry handling neighbour message, if it errors.
return nil, h.handleNeighbourMessage(msg)
case *CatchUpRequest:
return h.handleCatchUpRequest(msg)
case *CatchUpResponse:
return nil, h.handleCatchUpResponse(msg)
err := h.handleCatchUpResponse(msg)
if errors.Is(err, blocktree.ErrNodeNotFound) {
// TODO: we are adding these messages to reprocess them again, but we
// haven't added code to reprocess them. Do that.
// Also, revisit if we need to add these message in synchronous manner
// or not. If not, change catchUpResponseMessages to a normal map. #1531
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
h.grandpa.tracker.addCatchUpResponse(msg)
}
return nil, err
default:
return nil, ErrInvalidMessageType
}
Expand Down
29 changes: 18 additions & 11 deletions lib/grandpa/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
"github.com/ChainSafe/gossamer/lib/crypto/ed25519"
)

// tracker keeps track of messages that have been received that have failed to validate with ErrBlockDoesNotExist
// these messages may be needed again in the case that we are slightly out of sync with the rest of the network
// tracker keeps track of messages that have been received, but have failed to
// validate with ErrBlockDoesNotExist. These messages may be needed again in the
// case that we are slightly out of sync with the rest of the network.
type tracker struct {
blockState BlockState
handler *MessageHandler
Expand All @@ -23,19 +24,20 @@ type tracker struct {
mapLock sync.Mutex
in chan *types.Block // receive imported block from BlockState
stopped chan struct{}
// round(uint64) is used as key and *CatchUpResponse as value
catchUpResponseMessages sync.Map
}

func newTracker(bs BlockState, handler *MessageHandler) *tracker {
in := bs.GetImportedBlockNotifierChannel()

return &tracker{
blockState: bs,
handler: handler,
voteMessages: make(map[common.Hash]map[ed25519.PublicKeyBytes]*networkVoteMessage),
commitMessages: make(map[common.Hash]*CommitMessage),
mapLock: sync.Mutex{},
in: in,
stopped: make(chan struct{}),
blockState: bs,
handler: handler,
voteMessages: make(map[common.Hash]map[ed25519.PublicKeyBytes]*networkVoteMessage),
commitMessages: make(map[common.Hash]*CommitMessage),
mapLock: sync.Mutex{},
in: bs.GetImportedBlockNotifierChannel(),
stopped: make(chan struct{}),
catchUpResponseMessages: sync.Map{},
}
}

Expand Down Expand Up @@ -71,6 +73,10 @@ func (t *tracker) addCommit(cm *CommitMessage) {
t.commitMessages[cm.Vote.Hash] = cm
}

func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) {
t.catchUpResponseMessages.Store(cr.Round, cr)
}

func (t *tracker) handleBlocks() {
for {
select {
Expand All @@ -93,6 +99,7 @@ func (t *tracker) handleBlock(b *types.Block) {
h := b.Header.Hash()
if vms, has := t.voteMessages[h]; has {
for _, v := range vms {
// handleMessage would never error for vote message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe update the warn log? :p

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what you want here? Add this comment in log or remove the log?

_, err := t.handler.handleMessage(v.from, v.msg)
if err != nil {
logger.Warnf("failed to handle vote message %v: %s", v, err)
Expand Down
6 changes: 3 additions & 3 deletions lib/grandpa/message_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestMessageTracker_ValidateMessage(t *testing.T) {
msg: msg,
}

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrBlockDoesNotExist)
require.Equal(t, expected, gs.tracker.voteMessages[fake.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()])
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestMessageTracker_SendMessage(t *testing.T) {
msg: msg,
}

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrBlockDoesNotExist)
require.Equal(t, expected, gs.tracker.voteMessages[next.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()])

Expand Down Expand Up @@ -115,7 +115,7 @@ func TestMessageTracker_ProcessMessage(t *testing.T) {
msg: msg,
}

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, ErrBlockDoesNotExist, err)
require.Equal(t, expected, gs.tracker.voteMessages[next.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()])

Expand Down
7 changes: 1 addition & 6 deletions lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,7 @@ func (hs *GrandpaHandshake) Encode() ([]byte, error) {

// Decode the message into a GrandpaHandshake
func (hs *GrandpaHandshake) Decode(in []byte) error {
err := scale.Unmarshal(in, hs)
if err != nil {
return err
}

return nil
return scale.Unmarshal(in, hs)
}

// Type ...
Expand Down
12 changes: 6 additions & 6 deletions lib/grandpa/vote_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type networkVoteMessage struct {
msg *VoteMessage
}

// receiveMessages receives messages from the in channel until the specified condition is met
func (s *Service) receiveMessages(ctx context.Context) {
// receiveVoteMessages receives messages from the in channel until the specified condition is met
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
func (s *Service) receiveVoteMessages(ctx context.Context) {
for {
select {
case msg, ok := <-s.in:
Expand Down Expand Up @@ -65,7 +65,7 @@ func (s *Service) receiveMessages(ctx context.Context) {
logger.Warnf("unsupported stage %s", vm.Message.Stage.String())
}

v, err := s.validateMessage(msg.from, vm)
v, err := s.validateVoteMessage(msg.from, vm)
if err != nil {
logger.Debugf("failed to validate vote message %v: %s", vm, err)
continue
Expand Down Expand Up @@ -122,9 +122,9 @@ func (s *Service) createSignedVoteAndVoteMessage(vote *Vote, stage Subround) (*S
return pc, vm, nil
}

// validateMessage validates a VoteMessage and adds it to the current votes
// validateVoteMessage validates a VoteMessage and adds it to the current votes
// it returns the resulting vote if validated, error otherwise
func (s *Service) validateMessage(from peer.ID, m *VoteMessage) (*Vote, error) {
func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, error) {
// make sure round does not increment while VoteMessage is being validated
s.roundLock.Lock()
defer s.roundLock.Unlock()
Expand Down Expand Up @@ -153,7 +153,7 @@ func (s *Service) validateMessage(from peer.ID, m *VoteMessage) (*Vote, error) {
return nil, err
}

// check that setIDs match
// vote is considered invalid if set ID do not match.
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
if m.SetID != s.state.setID {
return nil, ErrSetIDMismatch
}
Expand Down
12 changes: 6 additions & 6 deletions lib/grandpa/vote_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestValidateMessage_Valid(t *testing.T) {
require.NoError(t, err)
gs.keypair = kr.Bob().(*ed25519.Keypair)

vote, err := gs.validateMessage("", msg)
vote, err := gs.validateVoteMessage("", msg)
require.NoError(t, err)
require.Equal(t, h.Hash(), vote.Hash)
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestValidateMessage_InvalidSignature(t *testing.T) {

msg.Message.Signature[63] = 0

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrInvalidSignature)
}

Expand Down Expand Up @@ -253,7 +253,7 @@ func TestValidateMessage_SetIDMismatch(t *testing.T) {

gs.state.setID = 1

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrSetIDMismatch)
}

Expand Down Expand Up @@ -298,7 +298,7 @@ func TestValidateMessage_Equivocation(t *testing.T) {
require.NoError(t, err)
gs.keypair = kr.Bob().(*ed25519.Keypair)

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, ErrEquivocation, err, gs.prevotes)
}

Expand Down Expand Up @@ -333,7 +333,7 @@ func TestValidateMessage_BlockDoesNotExist(t *testing.T) {
require.NoError(t, err)
gs.keypair = kr.Bob().(*ed25519.Keypair)

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrBlockDoesNotExist)
}

Expand Down Expand Up @@ -374,6 +374,6 @@ func TestValidateMessage_IsNotDescendant(t *testing.T) {
require.NoError(t, err)
gs.keypair = kr.Bob().(*ed25519.Keypair)

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, errInvalidVoteBlock, err, gs.prevotes)
}
2 changes: 1 addition & 1 deletion tests/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestSync_SingleBlockProducer(t *testing.T) {

numCmps := 10
for i := 0; i < numCmps; i++ {
time.Sleep(time.Second)
time.Sleep(3 * time.Second)
t.Log("comparing...", i)
hashes, err := compareBlocksByNumberWithRetry(t, nodes, strconv.Itoa(i))
if len(hashes) > 1 || len(hashes) == 0 {
Expand Down