Skip to content

Commit

Permalink
lib/grandpa: ensure catch-up logic works
Browse files Browse the repository at this point in the history
- ensure catch-up logic works when interacting with substrate nodes
- If round in the neighbour message is ahead of our current round by a
threshold, send a catch up request
- process the catch up response, if we can't process it at the moment,
store it to process later.

Closes #1531
  • Loading branch information
kishansagathiya committed Jan 10, 2022
1 parent 9e21587 commit b7d814c
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 22 deletions.
6 changes: 6 additions & 0 deletions lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ type Service struct {
in chan *networkVoteMessage // only used to receive *VoteMessage
finalisedCh chan *types.FinalisationInfo
neighbourMessage *NeighbourMessage // cached neighbour message

catchUpResponseCh chan *CatchUpResponse

CatchUpResponseCacheLock sync.Mutex
// round number is used as key
CatchUpResponseCache map[uint64]CatchUpResponse
}

// Config represents a GRANDPA service configuration
Expand Down
130 changes: 108 additions & 22 deletions lib/grandpa/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"fmt"
"math/big"
"reflect"
"time"

"github.com/ChainSafe/chaindb"
"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/dot/types"
Expand All @@ -21,6 +23,8 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

const CATCHUP_THRESHOLD = 2

// MessageHandler handles GRANDPA consensus messages
type MessageHandler struct {
grandpa *Service
Expand Down Expand Up @@ -53,17 +57,27 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.
case *CommitMessage:
return nil, h.handleCommitMessage(msg)
case *NeighbourMessage:
return nil, h.handleNeighbourMessage(msg)
fmt.Println("got a neighbour message")
return nil, h.handleNeighbourMessage(msg, from)
case *CatchUpRequest:
return h.handleCatchUpRequest(msg)
networkMessage, err := h.handleCatchUpRequest(msg, from)
if err != nil {
logger.Debugf("could not handle catch up request: %s", err)
}

return networkMessage, err
case *CatchUpResponse:
return nil, h.handleCatchUpResponse(msg)
err := h.handleCatchUpResponse(msg)
if err != nil {
logger.Debugf("could not catchup: %s", err)
}
return nil, err
default:
return nil, ErrInvalidMessageType
}
}

func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage) error {
func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer.ID) error {
currFinalized, err := h.blockState.GetFinalisedHeader(0, 0)
if err != nil {
return err
Expand All @@ -76,21 +90,74 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage) error {

// TODO; determine if there is some reason we don't receive justifications in responses near the head (usually),
// and remove the following code if it's fixed. (#1815)
head, err := h.blockState.BestBlockNumber()
// head, err := h.blockState.BestBlockNumber()
// if err != nil {
// return err
// }

// TODO: Why are we ignoring these? Isn't
// msg.Number likely to be higher if we are lagging behind?
// ignore neighbour messages that are above our head
// if int64(msg.Number) > head.Int64() {
// return nil
// }

logger.Debugf("got neighbour message with number %d, set id %d and round %d", msg.Number, msg.SetID, msg.Round)
// TODO: should we send a justification request here? potentially re-connect this to sync package? (#1815)

highestRound, highestSetID, err := h.blockState.GetHighestRoundAndSetID()
if err != nil {
return err
}

// ignore neighbour messages that are above our head
if int64(msg.Number) > head.Int64() {
return nil
logger.Debugf("lagging behind by %d", msg.Round-highestRound)
// catch up only if we are behind by more than catchup threshold
if (msg.Round - highestRound) > CATCHUP_THRESHOLD {
catchUpResponse, err := h.sendCatchUpRequest(
from, newCatchUpRequest(highestRound, highestSetID),
)
if err != nil {
logger.Debugf("failed to send catch up request: %s", err.Error())
return err
}

logger.Debugf("sent a catch up request to node %s", from)

err = h.handleCatchUpResponse(catchUpResponse)
if err != nil {
return err
}
}

logger.Debugf("got neighbour message with number %d, set id %d and round %d", msg.Number, msg.SetID, msg.Round)
// TODO: should we send a justification request here? potentially re-connect this to sync package? (#1815)
return nil
}

func (h *MessageHandler) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) (*CatchUpResponse, error) {
cm, err := req.ToConsensusMessage()
if err != nil {
return nil, err
}

err = h.grandpa.network.SendMessage(to, cm)
if err != nil {
return nil, err
}

// Can we do this without pausing?
// h.grandpa.paused.Store(true)

timer := time.NewTimer(time.Second * 5)
defer timer.Stop()

select {
case resp := <-h.grandpa.catchUpResponseCh:
fmt.Println("got a response, this is awesome")
return resp, nil
case <-timer.C:
return nil, errors.New("timeout")
}
}

func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error {
logger.Debugf("received commit message, msg: %+v", msg)

Expand Down Expand Up @@ -141,7 +208,7 @@ func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error {
return nil
}

func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest) (*ConsensusMessage, error) {
func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) (*ConsensusMessage, error) {
if !h.grandpa.authority {
return nil, nil //nolint:nilnil
}
Expand All @@ -157,15 +224,26 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest) (*ConsensusMe
return nil, ErrInvalidCatchUpRound
}

resp, err := h.grandpa.newCatchUpResponse(msg.Round, msg.SetID)
resp, err := h.grandpa.newCatchUpResponse(h.grandpa.state.round, h.grandpa.state.setID)
if err != nil {
return nil, err
}

cm, err := resp.ToConsensusMessage()
if err != nil {
return nil, err
}

err = h.grandpa.network.SendMessage(from, cm)
if err != nil {
return nil, err
}

logger.Debugf(
"sending catch up response with hash %s for round %d and set id %d",
"sent catch up response with hash %s for round %d and set id %d",
resp.Hash, msg.Round, msg.SetID)
return resp.ToConsensusMessage()

return nil, nil
}

func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error {
Expand All @@ -177,11 +255,6 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error {
"received catch up response with hash %s for round %d and set id %d",
msg.Hash, msg.Round, msg.SetID)

// TODO: re-add catch-up logic (#1531)
if true {
return nil
}

// if we aren't currently expecting a catch up response, return
if !h.grandpa.paused.Load().(bool) {
logger.Debug("not currently paused, ignoring catch up response")
Expand All @@ -192,10 +265,14 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error {
return ErrSetIDMismatch
}

if msg.Round != h.grandpa.state.round-1 {
if msg.Round <= h.grandpa.state.round {
return ErrInvalidCatchUpResponseRound
}

// TODO: confirm if we should add the message to the channel after or before
// checking set id and round.
h.grandpa.catchUpResponseCh <- msg

prevote, err := h.verifyPreVoteJustification(msg)
if err != nil {
return err
Expand Down Expand Up @@ -224,15 +301,24 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error {

// update state and signal to grandpa we are ready to initiate
head, err := h.grandpa.blockState.GetHeader(msg.Hash)
if err != nil {
if errors.Is(err, chaindb.ErrKeyNotFound) {
h.grandpa.CatchUpResponseCacheLock.Lock()

h.grandpa.CatchUpResponseCache[msg.Round] = *msg

h.grandpa.CatchUpResponseCacheLock.Unlock()

logger.Debugf("couldn not catch up to round %d, storing the catch up response to retry", msg.Round)
return nil
} else if err != nil {
return err
}

h.grandpa.head = head
h.grandpa.state.round = msg.Round
close(h.grandpa.resumed)
h.grandpa.resumed = make(chan struct{})
h.grandpa.paused.Store(false)
// h.grandpa.paused.Store(false)
logger.Debugf("caught up to round; unpaused service and grandpa state round is %d", h.grandpa.state.round)
return nil
}
Expand Down

0 comments on commit b7d814c

Please sign in to comment.