Skip to content

Commit

Permalink
lib/grandpa: ensure messages are stored and re-processed when needed
Browse files Browse the repository at this point in the history
  • Loading branch information
kishansagathiya committed Dec 9, 2021
1 parent 3951069 commit e4c791e
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 36 deletions.
8 changes: 5 additions & 3 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,15 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
return
}

// this is the place
// TODO: ensure grandpa stores *all* previously received votes and discards them
// only when they are for already finalised rounds; currently this causes issues
// because a vote might be received slightly too early, causing a round mismatch err,
// causing grandpa to discard the vote. (#1855)
_, isConsensusMsg := msg.(*ConsensusMessage)
if !added && !isConsensusMsg {
// added means message was already sent.
// for consensus messages we are happy to send them again, but not for other messages
// TODO: Is it bad behaviour to send consensus message multiple times?
// TODO: What to do if vote does not reach because of network related issue?
if !added {
return
}
}
Expand Down
1 change: 0 additions & 1 deletion lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,6 @@ func (s *Service) sendVoteMessage(stage Subround, msg *VoteMessage, roundComplet
ticker := time.NewTicker(s.interval * 4)
defer ticker.Stop()

// this for loop might be the place where messages are getting rebroadcasted
for {
if s.paused.Load().(bool) {
return
Expand Down
13 changes: 6 additions & 7 deletions lib/grandpa/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,15 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.
// if it errors.
return nil, h.handleNeighbourMessage(msg)
case *CatchUpRequest:
notificationsMessage, err := h.handleCatchUpRequest(msg)
if err != nil {
// TODO: If I can directly access tracker, why are we using in channel for
// networkVoteMessage
h.grandpa.tracker.addCatchUpRequest(msg)
}
return notificationsMessage, err
// CatchUpRequest seems like something that can be dropped, if we fail
// to process it
return h.handleCatchUpRequest(msg)
case *CatchUpResponse:
err := h.handleCatchUpResponse(msg)
// TODO: Retry for which errors
if err != nil {
// TODO: If I can directly access tracker, why are we using `in` channel for
// networkVoteMessage
h.grandpa.tracker.addCatchUpResponse(msg)
}
return nil, err
Expand Down
9 changes: 0 additions & 9 deletions lib/grandpa/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ type tracker struct {
in chan *types.Block // receive imported block from BlockState
stopped chan struct{}
// round is used as key
catchUpRequestMessages map[uint64]*CatchUpRequest
// round is used as key
catchUpResponseMessages map[uint64]*CatchUpResponse
}

Expand Down Expand Up @@ -74,13 +72,6 @@ func (t *tracker) addCommit(cm *CommitMessage) {
t.commitMessages[cm.Vote.Hash] = cm
}

func (t *tracker) addCatchUpRequest(cr *CatchUpRequest) {
t.mapLock.Lock()
defer t.mapLock.Unlock()

t.catchUpRequestMessages[cr.Round] = cr
}

func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) {
t.mapLock.Lock()
defer t.mapLock.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions lib/grandpa/message_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestMessageTracker_ValidateMessage(t *testing.T) {
msg: msg,
}

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrBlockDoesNotExist)
require.Equal(t, expected, gs.tracker.voteMessages[fake.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()])
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestMessageTracker_SendMessage(t *testing.T) {
msg: msg,
}

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrBlockDoesNotExist)
require.Equal(t, expected, gs.tracker.voteMessages[next.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()])

Expand Down Expand Up @@ -115,7 +115,7 @@ func TestMessageTracker_ProcessMessage(t *testing.T) {
msg: msg,
}

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, ErrBlockDoesNotExist, err)
require.Equal(t, expected, gs.tracker.voteMessages[next.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()])

Expand Down
2 changes: 0 additions & 2 deletions lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,8 @@ func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) (

switch m.(type) {
case *NeighbourMessage:
// TODO: why don't we do anything?
return false, nil
case *CatchUpResponse:
// TODO: why don't we do anything?
return false, nil
}

Expand Down
12 changes: 7 additions & 5 deletions lib/grandpa/vote_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *Service) receiveVoteMessages(ctx context.Context) {
logger.Warnf("unsupported stage %s", vm.Message.Stage.String())
}

v, err := s.validateMessage(msg.from, vm)
v, err := s.validateVoteMessage(msg.from, vm)
if err != nil {
logger.Debugf("failed to validate vote message %v: %s", vm, err)
continue
Expand Down Expand Up @@ -122,9 +122,9 @@ func (s *Service) createSignedVoteAndVoteMessage(vote *Vote, stage Subround) (*S
return pc, vm, nil
}

// validateMessage validates a VoteMessage and adds it to the current votes
// validateVoteMessage validates a VoteMessage and adds it to the current votes
// it returns the resulting vote if validated, error otherwise
func (s *Service) validateMessage(from peer.ID, m *VoteMessage) (*Vote, error) {
func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, error) {
// make sure round does not increment while VoteMessage is being validated
s.roundLock.Lock()
defer s.roundLock.Unlock()
Expand Down Expand Up @@ -153,12 +153,13 @@ func (s *Service) validateMessage(from peer.ID, m *VoteMessage) (*Vote, error) {
return nil, err
}

// check that setIDs match
// TODO: Change in set ID means possible change in voters (authorities). That
// would make me think that I could avoid the message in this case. Is that so?
// It seems the vote is considered invalid if set ID do not match.
if m.SetID != s.state.setID {
return nil, ErrSetIDMismatch
}

// This is where round mismatch is being checked
// check that vote is for current round
if m.Round != s.state.round {
if m.Round < s.state.round {
Expand Down Expand Up @@ -230,6 +231,7 @@ func (s *Service) validateMessage(from peer.ID, m *VoteMessage) (*Vote, error) {

equivocated := s.checkForEquivocation(voter, just, m.Message.Stage)
if equivocated {
// A vote is considered invalid if it is equivocatory
return nil, ErrEquivocation
}

Expand Down
12 changes: 6 additions & 6 deletions lib/grandpa/vote_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestValidateMessage_Valid(t *testing.T) {
require.NoError(t, err)
gs.keypair = kr.Bob().(*ed25519.Keypair)

vote, err := gs.validateMessage("", msg)
vote, err := gs.validateVoteMessage("", msg)
require.NoError(t, err)
require.Equal(t, h.Hash(), vote.Hash)
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestValidateMessage_InvalidSignature(t *testing.T) {

msg.Message.Signature[63] = 0

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrInvalidSignature)
}

Expand Down Expand Up @@ -253,7 +253,7 @@ func TestValidateMessage_SetIDMismatch(t *testing.T) {

gs.state.setID = 1

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrSetIDMismatch)
}

Expand Down Expand Up @@ -298,7 +298,7 @@ func TestValidateMessage_Equivocation(t *testing.T) {
require.NoError(t, err)
gs.keypair = kr.Bob().(*ed25519.Keypair)

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, ErrEquivocation, err, gs.prevotes)
}

Expand Down Expand Up @@ -333,7 +333,7 @@ func TestValidateMessage_BlockDoesNotExist(t *testing.T) {
require.NoError(t, err)
gs.keypair = kr.Bob().(*ed25519.Keypair)

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, err, ErrBlockDoesNotExist)
}

Expand Down Expand Up @@ -374,6 +374,6 @@ func TestValidateMessage_IsNotDescendant(t *testing.T) {
require.NoError(t, err)
gs.keypair = kr.Bob().(*ed25519.Keypair)

_, err = gs.validateMessage("", msg)
_, err = gs.validateVoteMessage("", msg)
require.Equal(t, errInvalidVoteBlock, err, gs.prevotes)
}

0 comments on commit e4c791e

Please sign in to comment.