Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: kyrie-yl <[email protected]>
  • Loading branch information
kyrie-yl committed Mar 8, 2022
1 parent 2ed75ab commit 7b070c0
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 67 deletions.
2 changes: 1 addition & 1 deletion core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
},
// for fast node which verify trie from remote verify peers, a block's H-11 ancestor should have been verify.
func() error {
if v.remoteValidator != nil && !v.remoteValidator.AncestorVerified(v.bc.GetHeaderByNumber(header.Number.Uint64())) {
if v.remoteValidator != nil && !v.remoteValidator.AncestorVerified(header) {
return fmt.Errorf("block's ancessor %x has not been verified", block.Hash())
}
return nil
Expand Down
109 changes: 43 additions & 66 deletions core/remote_state_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package core

import (
"fmt"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/metrics"
"math/rand"
"time"

lru "github.com/hashicorp/golang-lru"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

const (
Expand All @@ -30,11 +30,11 @@ const (
)

var (
remoteVerifyTaskCounter = metrics.NewRegisteredCounter("remote/state/verify/task/total", nil)
succeedRemoteVerifyTaskMeter = metrics.NewRegisteredMeter("succeed/remote/verify/task", nil)
failedRemoteVerifyTaskMeter = metrics.NewRegisteredMeter("failed/remote/verify/task", nil)
verifyTaskCounter = metrics.NewRegisteredCounter("verifymanager/task/total", nil)
verifyTaskSucceedMeter = metrics.NewRegisteredMeter("verifymanager/task/result/succeed", nil)
verifyTaskFailedMeter = metrics.NewRegisteredMeter("verifymanager/task/result/failed", nil)

succeedTaskExecutionTimer = metrics.NewRegisteredTimer("succeed/task/execution", nil)
verifyTaskExecutionTimer = metrics.NewRegisteredTimer("verifymanager/task/execution", nil)
)

type remoteVerifyManager struct {
Expand All @@ -53,14 +53,14 @@ type remoteVerifyManager struct {
messageCh chan verifyMessage
}

func NewVerifyManager(blockchain *BlockChain, peers verifyPeers, allowUntrusted bool) *remoteVerifyManager {
func NewVerifyManager(blockchain *BlockChain, peers verifyPeers, allowInsecure bool) *remoteVerifyManager {
verifiedCache, _ := lru.New(verifiedCacheSize)
vm := &remoteVerifyManager{
bc: blockchain,
tasks: make(map[common.Hash]*verifyTask),
peers: peers,
verifiedCache: verifiedCache,
allowInsecure: allowUntrusted,
allowInsecure: allowInsecure,

chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
verifyCh: make(chan common.Hash, maxForkHeight),
Expand All @@ -87,18 +87,18 @@ func (vm *remoteVerifyManager) mainLoop() {
vm.cacheBlockVerified(hash)
if task, ok := vm.tasks[hash]; ok {
delete(vm.tasks, hash)
remoteVerifyTaskCounter.Dec(1)
succeedRemoteVerifyTaskMeter.Mark(1)
succeedTaskExecutionTimer.Update(time.Since(task.startAt))
verifyTaskCounter.Dec(1)
verifyTaskSucceedMeter.Mark(1)
verifyTaskExecutionTimer.Update(time.Since(task.startAt))
close(task.terminalCh)
}
case <-pruneTicker.C:
for hash, task := range vm.tasks {
if vm.bc.CurrentHeader().Number.Cmp(task.blockHeader.Number) == 1 &&
vm.bc.CurrentHeader().Number.Uint64()-task.blockHeader.Number.Uint64() > pruneHeightDiff {
delete(vm.tasks, hash)
remoteVerifyTaskCounter.Dec(1)
failedRemoteVerifyTaskMeter.Mark(1)
verifyTaskCounter.Dec(1)
verifyTaskFailedMeter.Mark(1)
close(task.terminalCh)
}
}
Expand Down Expand Up @@ -137,7 +137,7 @@ func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) {
if diffLayer, err = vm.bc.GenerateDiffLayer(hash); err != nil {
log.Error("failed to get diff layer", "block", hash, "number", header.Number, "error", err)
return
} else {
} else if diffLayer == nil {
log.Info("this is an empty block:", "block", hash, "number", header.Number)
return
}
Expand All @@ -149,7 +149,7 @@ func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) {
}
verifyTask := NewVerifyTask(diffHash, header, vm.peers, vm.verifyCh, vm.allowInsecure)
vm.tasks[hash] = verifyTask
remoteVerifyTaskCounter.Inc(1)
verifyTaskCounter.Inc(1)
}(header.Hash())
header = vm.bc.GetHeaderByHash(header.ParentHash)
}
Expand All @@ -173,7 +173,7 @@ func (vm *remoteVerifyManager) AncestorVerified(header *types.Header) bool {
// check whether H-11 block is a empty block.
if header.TxHash == types.EmptyRootHash {
parent := vm.bc.GetHeaderByHash(header.ParentHash)
return header.Root == parent.Root
return parent == nil || header.Root == parent.Root
}
hash := header.Hash()
_, exist := vm.verifiedCache.Get(hash)
Expand Down Expand Up @@ -201,21 +201,21 @@ type verifyTask struct {
diffhash common.Hash
blockHeader *types.Header
candidatePeers verifyPeers
BadPeers map[string]struct{}
badPeers map[string]struct{}
startAt time.Time
allowUntrusted bool
allowInsecure bool

messageCh chan verifyMessage
terminalCh chan struct{}
}

func NewVerifyTask(diffhash common.Hash, header *types.Header, peers verifyPeers, verifyCh chan common.Hash, allowUntrusted bool) *verifyTask {
func NewVerifyTask(diffhash common.Hash, header *types.Header, peers verifyPeers, verifyCh chan common.Hash, allowInsecure bool) *verifyTask {
vt := &verifyTask{
diffhash: diffhash,
blockHeader: header,
candidatePeers: peers,
BadPeers: make(map[string]struct{}),
allowUntrusted: allowUntrusted,
badPeers: make(map[string]struct{}),
allowInsecure: allowInsecure,
messageCh: make(chan verifyMessage),
terminalCh: make(chan struct{}),
}
Expand All @@ -232,42 +232,21 @@ func (vt *verifyTask) Start(verifyCh chan common.Hash) {
for {
select {
case msg := <-vt.messageCh:
switch msg.verifyResult.Status.Code / 100 {
case 1:
switch msg.verifyResult.Status {
case types.StatusFullVerified:
vt.compareRootHashAndWrite(msg, verifyCh)
newRecievedMsgTypeGauge("fullVerified", msg.peerId).Inc(1)
case types.StatusPartiallyVerified:
log.Warn("block %s , num= %s is untrusted verified", msg.verifyResult.BlockHash, msg.verifyResult.BlockNumber)
if vt.allowUntrusted {
vt.compareRootHashAndWrite(msg, verifyCh)
}
newRecievedMsgTypeGauge("partialVerified", msg.peerId).Inc(1)
switch msg.verifyResult.Status {
case types.StatusFullVerified:
vt.compareRootHashAndMark(msg, verifyCh)
case types.StatusPartiallyVerified:
log.Warn("block %s , num= %s is insecure verified", msg.verifyResult.BlockHash, msg.verifyResult.BlockNumber)
if vt.allowInsecure {
vt.compareRootHashAndMark(msg, verifyCh)
}

case 2, 4:
vt.BadPeers[msg.peerId] = struct{}{}
case types.StatusDiffHashMismatch, types.StatusImpossibleFork, types.StatusUnexpectedError:
vt.badPeers[msg.peerId] = struct{}{}
log.Info("peer %s is not available: code %d, msg %s,", msg.peerId, msg.verifyResult.Status.Code, msg.verifyResult.Status.Msg)
switch msg.verifyResult.Status {
case types.StatusDiffHashMismatch:
newRecievedMsgTypeGauge("diffHashMismatch", msg.peerId).Inc(1)
case types.StatusImpossibleFork:
newRecievedMsgTypeGauge("impossibleFork", msg.peerId).Inc(1)
case types.StatusUnexpectedError:
newRecievedMsgTypeGauge("unexpectedError", msg.peerId).Inc(1)
}
case 3:
case types.StatusBlockTooNew, types.StatusBlockNewer, types.StatusPossibleFork:
log.Info("return msg from peer %s for block %s is %s", msg.peerId, msg.verifyResult.BlockHash, msg.verifyResult.Status.Msg)
switch msg.verifyResult.Status {
case types.StatusBlockTooNew:
newRecievedMsgTypeGauge("blockTooNew", msg.peerId).Inc(1)
case types.StatusBlockNewer:
newRecievedMsgTypeGauge("blockNewer", msg.peerId).Inc(1)
case types.StatusPossibleFork:
newRecievedMsgTypeGauge("possibleFork", msg.peerId).Inc(1)
}
}
newVerifyMsgTypeGauge(msg.verifyResult.Status.Code, msg.peerId).Inc(1)
case <-resend.C:
// if a task has run over 15s, try all the vaild peers to verify.
if time.Since(vt.startAt) < tryAllPeersTime {
Expand All @@ -287,37 +266,35 @@ func (vt *verifyTask) sendVerifyRequest(n int) {
var validPeers []VerifyPeer
candidatePeers := vt.candidatePeers.GetVerifyPeers()
for _, p := range candidatePeers {
if _, ok := vt.BadPeers[p.ID()]; !ok {
if _, ok := vt.badPeers[p.ID()]; !ok {
validPeers = append(validPeers, p)
}
}
// if has not valid peer, log warning.
if len(validPeers) == 0 {
log.Warn("there is no valid peer for block", vt.blockHeader.Number)
}
if n < 0 || n >= len(validPeers) {
for _, p := range validPeers {
p.RequestRoot(vt.blockHeader.Number.Uint64(), vt.blockHeader.Hash(), vt.diffhash)
}
return
}

// if n < len(validPeers), select n peers from validPeers randomly.
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(validPeers), func(i, j int) { validPeers[i], validPeers[j] = validPeers[j], validPeers[i] })
if n < len(validPeers) && n > 0 {
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(validPeers), func(i, j int) { validPeers[i], validPeers[j] = validPeers[j], validPeers[i] })
} else {
n = len(validPeers)
}
for i := 0; i < n; i++ {
p := validPeers[i]
p.RequestRoot(vt.blockHeader.Number.Uint64(), vt.blockHeader.Hash(), vt.diffhash)
}
}

func (vt *verifyTask) compareRootHashAndWrite(msg verifyMessage, verifyCh chan common.Hash) {
func (vt *verifyTask) compareRootHashAndMark(msg verifyMessage, verifyCh chan common.Hash) {
if msg.verifyResult.Root == vt.blockHeader.Root {
blockhash := msg.verifyResult.BlockHash
// write back to manager so that manager can cache the result and delete this task.
verifyCh <- blockhash
} else {
vt.BadPeers[msg.peerId] = struct{}{}
vt.badPeers[msg.peerId] = struct{}{}
}
}

Expand Down Expand Up @@ -393,7 +370,7 @@ func (mode VerifyMode) NeedRemoteVerify() bool {
return mode == FullVerify || mode == InsecureVerify
}

func newRecievedMsgTypeGauge(msgType, peerId string) metrics.Gauge {
m := fmt.Sprintf("recieved/%s/message/from/%s", msgType, peerId)
func newVerifyMsgTypeGauge(msgType uint16, peerId string) metrics.Gauge {
m := fmt.Sprintf("verifymanager/message/%d/peer/%s", msgType, peerId)
return metrics.GetOrRegisterGauge(m, nil)
}

0 comments on commit 7b070c0

Please sign in to comment.