From b7d814c368ec33e59892246e1a4ae3ff07b03704 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 10 Jan 2022 17:43:22 +0530 Subject: [PATCH] lib/grandpa: ensure catch-up logic works - ensure catch-up logic works when interacting with substrate nodes - If round in the neighbour message is ahead of our current round by a threshold, send a catch up request - process the catch up response, if we can't process it at the moment, store it to process later. Closes #1531 --- lib/grandpa/grandpa.go | 6 ++ lib/grandpa/message_handler.go | 130 +++++++++++++++++++++++++++------ 2 files changed, 114 insertions(+), 22 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 0955dcbb7f..ddf34e75ef 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -74,6 +74,12 @@ type Service struct { in chan *networkVoteMessage // only used to receive *VoteMessage finalisedCh chan *types.FinalisationInfo neighbourMessage *NeighbourMessage // cached neighbour message + + catchUpResponseCh chan *CatchUpResponse + + CatchUpResponseCacheLock sync.Mutex + // round number is used as key + CatchUpResponseCache map[uint64]CatchUpResponse } // Config represents a GRANDPA service configuration diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 12285fc894..168f0ae71c 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -9,7 +9,9 @@ import ( "fmt" "math/big" "reflect" + "time" + "github.com/ChainSafe/chaindb" "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/dot/types" @@ -21,6 +23,8 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) +const CATCHUP_THRESHOLD = 2 + // MessageHandler handles GRANDPA consensus messages type MessageHandler struct { grandpa *Service @@ -53,17 +57,27 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. case *CommitMessage: return nil, h.handleCommitMessage(msg) case *NeighbourMessage: - return nil, h.handleNeighbourMessage(msg) + fmt.Println("got a neighbour message") + return nil, h.handleNeighbourMessage(msg, from) case *CatchUpRequest: - return h.handleCatchUpRequest(msg) + networkMessage, err := h.handleCatchUpRequest(msg, from) + if err != nil { + logger.Debugf("could not handle catch up request: %s", err) + } + + return networkMessage, err case *CatchUpResponse: - return nil, h.handleCatchUpResponse(msg) + err := h.handleCatchUpResponse(msg) + if err != nil { + logger.Debugf("could not catchup: %s", err) + } + return nil, err default: return nil, ErrInvalidMessageType } } -func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage) error { +func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage, from peer.ID) error { currFinalized, err := h.blockState.GetFinalisedHeader(0, 0) if err != nil { return err @@ -76,21 +90,74 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage) error { // TODO; determine if there is some reason we don't receive justifications in responses near the head (usually), // and remove the following code if it's fixed. (#1815) - head, err := h.blockState.BestBlockNumber() + // head, err := h.blockState.BestBlockNumber() + // if err != nil { + // return err + // } + + // TODO: Why are we ignoring these? Isn't + // msg.Number likely to be higher if we are lagging behind? + // ignore neighbour messages that are above our head + // if int64(msg.Number) > head.Int64() { + // return nil + // } + + logger.Debugf("got neighbour message with number %d, set id %d and round %d", msg.Number, msg.SetID, msg.Round) + // TODO: should we send a justification request here? potentially re-connect this to sync package? (#1815) + + highestRound, highestSetID, err := h.blockState.GetHighestRoundAndSetID() if err != nil { return err } - // ignore neighbour messages that are above our head - if int64(msg.Number) > head.Int64() { - return nil + logger.Debugf("lagging behind by %d", msg.Round-highestRound) + // catch up only if we are behind by more than catchup threshold + if (msg.Round - highestRound) > CATCHUP_THRESHOLD { + catchUpResponse, err := h.sendCatchUpRequest( + from, newCatchUpRequest(highestRound, highestSetID), + ) + if err != nil { + logger.Debugf("failed to send catch up request: %s", err.Error()) + return err + } + + logger.Debugf("sent a catch up request to node %s", from) + + err = h.handleCatchUpResponse(catchUpResponse) + if err != nil { + return err + } } - logger.Debugf("got neighbour message with number %d, set id %d and round %d", msg.Number, msg.SetID, msg.Round) - // TODO: should we send a justification request here? potentially re-connect this to sync package? (#1815) return nil } +func (h *MessageHandler) sendCatchUpRequest(to peer.ID, req *CatchUpRequest) (*CatchUpResponse, error) { + cm, err := req.ToConsensusMessage() + if err != nil { + return nil, err + } + + err = h.grandpa.network.SendMessage(to, cm) + if err != nil { + return nil, err + } + + // Can we do this without pausing? + // h.grandpa.paused.Store(true) + + timer := time.NewTimer(time.Second * 5) + defer timer.Stop() + + select { + case resp := <-h.grandpa.catchUpResponseCh: + fmt.Println("got a response, this is awesome") + return resp, nil + case <-timer.C: + return nil, errors.New("timeout") + } +} + func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error { logger.Debugf("received commit message, msg: %+v", msg) @@ -141,7 +208,7 @@ func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error { return nil } -func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest) (*ConsensusMessage, error) { +func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest, from peer.ID) (*ConsensusMessage, error) { if !h.grandpa.authority { return nil, nil //nolint:nilnil } @@ -157,15 +224,26 @@ func (h *MessageHandler) handleCatchUpRequest(msg *CatchUpRequest) (*ConsensusMe return nil, ErrInvalidCatchUpRound } - resp, err := h.grandpa.newCatchUpResponse(msg.Round, msg.SetID) + resp, err := h.grandpa.newCatchUpResponse(h.grandpa.state.round, h.grandpa.state.setID) + if err != nil { + return nil, err + } + + cm, err := resp.ToConsensusMessage() + if err != nil { + return nil, err + } + + err = h.grandpa.network.SendMessage(from, cm) if err != nil { return nil, err } logger.Debugf( - "sending catch up response with hash %s for round %d and set id %d", + "sent catch up response with hash %s for round %d and set id %d", resp.Hash, msg.Round, msg.SetID) - return resp.ToConsensusMessage() + + return nil, nil } func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { @@ -177,11 +255,6 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { "received catch up response with hash %s for round %d and set id %d", msg.Hash, msg.Round, msg.SetID) - // TODO: re-add catch-up logic (#1531) - if true { - return nil - } - // if we aren't currently expecting a catch up response, return if !h.grandpa.paused.Load().(bool) { logger.Debug("not currently paused, ignoring catch up response") @@ -192,10 +265,14 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { return ErrSetIDMismatch } - if msg.Round != h.grandpa.state.round-1 { + if msg.Round <= h.grandpa.state.round { return ErrInvalidCatchUpResponseRound } + // TODO: confirm if we should add the message to the channel after or before + // checking set id and round. + h.grandpa.catchUpResponseCh <- msg + prevote, err := h.verifyPreVoteJustification(msg) if err != nil { return err @@ -224,7 +301,16 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { // update state and signal to grandpa we are ready to initiate head, err := h.grandpa.blockState.GetHeader(msg.Hash) - if err != nil { + if errors.Is(err, chaindb.ErrKeyNotFound) { + h.grandpa.CatchUpResponseCacheLock.Lock() + + h.grandpa.CatchUpResponseCache[msg.Round] = *msg + + h.grandpa.CatchUpResponseCacheLock.Unlock() + + logger.Debugf("couldn not catch up to round %d, storing the catch up response to retry", msg.Round) + return nil + } else if err != nil { return err } @@ -232,7 +318,7 @@ func (h *MessageHandler) handleCatchUpResponse(msg *CatchUpResponse) error { h.grandpa.state.round = msg.Round close(h.grandpa.resumed) h.grandpa.resumed = make(chan struct{}) - h.grandpa.paused.Store(false) + // h.grandpa.paused.Store(false) logger.Debugf("caught up to round; unpaused service and grandpa state round is %d", h.grandpa.state.round) return nil }