Skip to content

Commit

Permalink
feat(dot/network, lib/grandpa): request justification on receiving Ne…
Browse files Browse the repository at this point in the history
…ighbourMessage, verify justification on receipt (#1529)
  • Loading branch information
noot authored Apr 20, 2021
1 parent b18290c commit e1f9f42
Show file tree
Hide file tree
Showing 28 changed files with 430 additions and 266 deletions.
4 changes: 4 additions & 0 deletions dot/core/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ func (s *mockSyncer) ProcessBlockData(_ []*types.BlockData) (int, error) {
return 0, nil
}

func (s *mockSyncer) ProcessJustification(data []*types.BlockData) (int, error) {
return 0, nil
}

func (s *mockSyncer) IsSynced() bool {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (h *host) bootstrap() {
failed++
}
}
if failed == len(all) {
if failed == len(all) && len(all) != 0 {
logger.Error("failed to bootstrap to any bootnode")
}
}
Expand Down
2 changes: 2 additions & 0 deletions dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Syncer interface {
// CreateBlockResponse is called upon receipt of a BlockRequestMessage to create the response
CreateBlockResponse(*BlockRequestMessage) (*BlockResponseMessage, error)

ProcessJustification(data []*types.BlockData) (int, error)

// ProcessBlockData is called to process BlockData received in a BlockResponseMessage
ProcessBlockData(data []*types.BlockData) (int, error)

Expand Down
6 changes: 4 additions & 2 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error
}

if numJustifications == 0 {
logger.Debug("got empty justification data", "start hash", startHash)
return errEmptyJustificationData
}

Expand All @@ -484,7 +485,7 @@ func (q *syncQueue) pushResponse(resp *BlockResponseMessage, pid peer.ID) error
from: pid,
})

logger.Info("pushed justification data to queue", "hash", startHash)
logger.Debug("pushed justification data to queue", "hash", startHash)
q.responseCh <- justificationResponses
return nil
}
Expand Down Expand Up @@ -668,7 +669,7 @@ func (q *syncQueue) handleBlockJustification(data []*types.BlockData) {
startHash, endHash := data[0].Hash, data[len(data)-1].Hash
logger.Debug("sending justification data to syncer", "start", startHash, "end", endHash)

_, err := q.s.syncer.ProcessBlockData(data)
_, err := q.s.syncer.ProcessJustification(data)
if err != nil {
logger.Warn("failed to handle block justifications", "error", err)
return
Expand Down Expand Up @@ -792,6 +793,7 @@ func (q *syncQueue) handleBlockAnnounceHandshake(blockNum uint32, from peer.ID)

func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID) {
q.updatePeerScore(from, 1)
logger.Info("received BlockAnnounce", "number", msg.Number, "from", from)

header, err := types.NewHeader(msg.ParentHash, msg.StateRoot, msg.ExtrinsicsRoot, msg.Number, msg.Digest)
if err != nil {
Expand Down
59 changes: 8 additions & 51 deletions dot/network/sync_justification.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,62 +18,19 @@ package network

import (
"math/big"
"time"
)

func (q *syncQueue) finalizeAtHead() {
prev, err := q.s.blockState.GetFinalizedHeader(0, 0)
if err != nil {
logger.Error("failed to get latest finalized block header", "error", err)
return
}

for {
select {
// sleep for average block time TODO: make this configurable from slot duration
case <-time.After(q.slotDuration * 2):
case <-q.ctx.Done():
return
}

head, err := q.s.blockState.BestBlockNumber()
if err != nil {
continue
}

if head.Int64() < q.goal {
continue
}

curr, err := q.s.blockState.GetFinalizedHeader(0, 0)
if err != nil {
continue
}

logger.Debug("checking finalized blocks", "curr", curr.Number, "prev", prev.Number)

if curr.Number.Cmp(prev.Number) > 0 {
prev = curr
continue
}

prev = curr

start := head.Uint64() - uint64(blockRequestSize)
if curr.Number.Uint64() > start {
start = curr.Number.Uint64() + 1
} else if int(start) < int(blockRequestSize) {
start = 1
}
"github.com/libp2p/go-libp2p-core/peer"
)

q.pushJustificationRequest(start)
}
// SendJustificationRequest pushes a justification request to the queue to be sent out to the network
func (s *Service) SendJustificationRequest(to peer.ID, num uint32) {
s.syncQueue.pushJustificationRequest(to, uint64(num))
}

func (q *syncQueue) pushJustificationRequest(start uint64) {
func (q *syncQueue) pushJustificationRequest(to peer.ID, start uint64) {
startHash, err := q.s.blockState.GetHashByNumber(big.NewInt(int64(start)))
if err != nil {
logger.Error("failed to get hash for block w/ number", "number", start, "error", err)
logger.Debug("failed to get hash for block w/ number", "number", start, "error", err)
return
}

Expand All @@ -87,6 +44,6 @@ func (q *syncQueue) pushJustificationRequest(start uint64) {

q.requestCh <- &syncRequest{
req: req,
to: "",
to: to,
}
}
32 changes: 0 additions & 32 deletions dot/network/sync_justification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package network

import (
"context"
"math/big"
"testing"
"time"

Expand Down Expand Up @@ -135,34 +134,3 @@ func TestSyncQueue_processBlockResponses_Justification(t *testing.T) {
require.True(t, ok)
require.Equal(t, 2, score)
}

func TestSyncQueue_finalizeAtHead(t *testing.T) {
q := newTestSyncQueue(t)
q.stop()
time.Sleep(time.Second)
q.ctx = context.Background()
q.slotDuration = time.Millisecond * 200

hash, err := q.s.blockState.GetHashByNumber(big.NewInt(1))
require.NoError(t, err)

go q.finalizeAtHead()
time.Sleep(time.Second)

data, has := q.justificationRequestData.Load(hash)
require.True(t, has)
require.Equal(t, requestData{}, data)

expected := createBlockRequestWithHash(hash, blockRequestSize)
expected.RequestedData = RequestedDataJustification

select {
case req := <-q.requestCh:
require.Equal(t, &syncRequest{
req: expected,
to: "",
}, req)
case <-time.After(time.Second):
t.Fatal("did not receive request")
}
}
4 changes: 4 additions & 0 deletions dot/network/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (s *mockSyncer) ProcessBlockData(data []*types.BlockData) (int, error) {
return 0, nil
}

func (s *mockSyncer) ProcessJustification(data []*types.BlockData) (int, error) {
return 0, nil
}

func (s *mockSyncer) IsSynced() bool {
return s.synced
}
Expand Down
14 changes: 7 additions & 7 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func NodeInitialized(basepath string, expected bool) bool {
_, err := os.Stat(registry)
if os.IsNotExist(err) {
if expected {
logger.Warn(
logger.Debug(
"node has not been initialized",
"basepath", basepath,
"error", "failed to locate KEYREGISTRY file in data directory",
Expand Down Expand Up @@ -242,12 +242,6 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
return nil, err
}

// Syncer
syncer, err := createSyncService(cfg, stateSrvc, bp, dh, ver, rt)
if err != nil {
return nil, err
}

// create GRANDPA service
fg, err := createGRANDPAService(cfg, rt, stateSrvc, dh, ks.Gran, networkSrvc)
if err != nil {
Expand All @@ -256,6 +250,12 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
nodeSrvcs = append(nodeSrvcs, fg)
dh.SetFinalityGadget(fg) // TODO: this should be cleaned up

// Syncer
syncer, err := createSyncService(cfg, stateSrvc, bp, fg, dh, ver, rt)
if err != nil {
return nil, err
}

// Core Service

// create core service and append core service to node services
Expand Down
4 changes: 4 additions & 0 deletions dot/rpc/modules/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (s *mockSyncer) ProcessBlockData(_ []*types.BlockData) (int, error) {
return 0, nil
}

func (s *mockSyncer) ProcessJustification(_ []*types.BlockData) (int, error) {
return 0, nil
}

func (s *mockSyncer) HandleBlockAnnounce(msg *network.BlockAnnounceMessage) error {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,14 @@ func createBlockVerifier(st *state.Service) (*babe.VerificationManager, error) {
return ver, nil
}

func createSyncService(cfg *Config, st *state.Service, bp sync.BlockProducer, dh *core.DigestHandler, verifier *babe.VerificationManager, rt runtime.Instance) (*sync.Service, error) {
func createSyncService(cfg *Config, st *state.Service, bp sync.BlockProducer, fg sync.FinalityGadget, dh *core.DigestHandler, verifier *babe.VerificationManager, rt runtime.Instance) (*sync.Service, error) {
syncCfg := &sync.Config{
LogLvl: cfg.Log.SyncLvl,
BlockState: st.Block,
StorageState: st.Storage,
TransactionState: st.Transaction,
BlockProducer: bp,
FinalityGadget: fg,
Verifier: verifier,
Runtime: rt,
DigestHandler: dh,
Expand Down
2 changes: 1 addition & 1 deletion dot/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestCreateSyncService(t *testing.T) {
ver, err := createBlockVerifier(stateSrvc)
require.NoError(t, err)

_, err = createSyncService(cfg, stateSrvc, nil, nil, ver, rt)
_, err = createSyncService(cfg, stateSrvc, nil, nil, nil, ver, rt)
require.NoError(t, err)
}

Expand Down
5 changes: 5 additions & 0 deletions dot/state/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ func (s *Service) Rewind(toBlock int64) error {
return err
}

err = s.Block.SetFinalizedHash(header.Hash(), 0, 0)
if err != nil {
return err
}

return StoreBestBlockHash(s.db, newHead)
}

Expand Down
5 changes: 5 additions & 0 deletions dot/sync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,8 @@ type DigestHandler interface {
type Verifier interface {
VerifyBlock(header *types.Header) error
}

// FinalityGadget implements justification verification functionality
type FinalityGadget interface {
VerifyBlockJustification([]byte) error
}
32 changes: 31 additions & 1 deletion dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Service struct {
storageState StorageState
transactionState TransactionState
blockProducer BlockProducer
finalityGadget FinalityGadget

// Synchronization variables
synced bool
Expand All @@ -63,6 +64,7 @@ type Config struct {
BlockState BlockState
StorageState StorageState
BlockProducer BlockProducer
FinalityGadget FinalityGadget
TransactionState TransactionState
Runtime runtime.Instance
Verifier Verifier
Expand Down Expand Up @@ -105,6 +107,7 @@ func NewService(cfg *Config) (*Service, error) {
blockState: cfg.BlockState,
storageState: cfg.StorageState,
blockProducer: cfg.BlockProducer,
finalityGadget: cfg.FinalityGadget,
synced: true,
highestSeenBlock: big.NewInt(0),
transactionState: cfg.TransactionState,
Expand Down Expand Up @@ -149,6 +152,27 @@ func (s *Service) HandleBlockAnnounce(msg *network.BlockAnnounceMessage) error {
return nil
}

// ProcessJustification processes block data containing justifications
func (s *Service) ProcessJustification(data []*types.BlockData) (int, error) {
if len(data) == 0 {
return 0, ErrNilBlockData
}

for i, bd := range data {
header, err := s.blockState.GetHeader(bd.Hash)
if err != nil {
return i, err
}

if bd.Justification != nil && bd.Justification.Exists() {
logger.Debug("handling Justification...", "number", header.Number, "hash", bd.Hash)
s.handleJustification(header, bd.Justification.Value())
}
}

return 0, nil
}

// ProcessBlockData processes the BlockData from a BlockResponse and returns the index of the last BlockData it handled on success,
// or the index of the block data that errored on failure.
func (s *Service) ProcessBlockData(data []*types.BlockData) (int, error) {
Expand Down Expand Up @@ -351,7 +375,13 @@ func (s *Service) handleJustification(header *types.Header, justification []byte
return
}

err := s.blockState.SetFinalizedHash(header.Hash(), 0, 0)
err := s.finalityGadget.VerifyBlockJustification(justification)
if err != nil {
logger.Warn("failed to verify block justification", "hash", header.Hash(), "number", header.Number, "error", err)
return
}

err = s.blockState.SetFinalizedHash(header.Hash(), 0, 0)
if err != nil {
logger.Error("failed to set finalized hash", "error", err)
return
Expand Down
Loading

0 comments on commit e1f9f42

Please sign in to comment.