Skip to content

Commit

Permalink
prepare for the snapshot removal
Browse files Browse the repository at this point in the history
  • Loading branch information
yoomee1313 committed Dec 19, 2024
1 parent 4788e28 commit 0a7a3b8
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 199 deletions.
2 changes: 1 addition & 1 deletion consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type backend struct {
// Protects the signer fields
candidatesLock sync.RWMutex
// Snapshots for recent block to speed up reorgs
recents *lru.ARCCache
recents *lru.ARCCache // TODO-kaiax: Remove snapshot cache

// event subscription for ChainHeadEvent event
broadcaster consensus.Broadcaster
Expand Down
217 changes: 19 additions & 198 deletions consensus/istanbul/backend/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,45 +823,10 @@ func (sb *backend) GetConsensusInfo(block *types.Block) (consensus.ConsensusInfo
return consensus.ConsensusInfo{}, nil
}

round := block.Header().Round()
view := &istanbul.View{
Sequence: new(big.Int).Set(block.Number()),
Round: new(big.Int).SetInt64(int64(round)),
}

// get the proposer of this block.
proposer, err := ecrecover(block.Header())
if err != nil {
return consensus.ConsensusInfo{}, err
}

if sb.chain == nil {
return consensus.ConsensusInfo{}, errNoChainReader
}

// get the snapshot of the previous block.
parentHash := block.ParentHash()
snap, err := sb.snapshot(sb.chain, blockNumber-1, parentHash, nil, false)
if err != nil {
logger.Error("Failed to get snapshot.", "blockNum", blockNumber, "err", err)
return consensus.ConsensusInfo{}, errInternalError
}

// get origin proposer at 0 round.
originProposer := common.Address{}
lastProposer := sb.GetProposer(blockNumber - 1)

newValSet := snap.ValSet.Copy()
newValSet.CalcProposer(lastProposer, 0)
originProposer = newValSet.GetProposer().Address()

// get the committee list of this block at the view (blockNumber, round)
committee := snap.ValSet.SubListWithProposer(parentHash, proposer, view)
committeeAddrs := make([]common.Address, len(committee))
for i, v := range committee {
committeeAddrs[i] = v.Address()
}

// get the committers of this block from committed seals
extra, err := types.ExtractIstanbulExtra(block.Header())
if err != nil {
Expand All @@ -872,10 +837,18 @@ func (sb *backend) GetConsensusInfo(block *types.Block) (consensus.ConsensusInfo
return consensus.ConsensusInfo{}, err
}

round := block.Header().Round()

// get the committee list of this block (blockNumber, round)
currentRoundCState, err := sb.GetCommitteeState(block.NumberU64())
if err != nil {
logger.Error("Failed to get committee or proposer.", "blockNum", blockNumber, "round", uint64(round), "err", err)
return consensus.ConsensusInfo{}, errInternalError
}
// Uncomment to validate if committers are in the committee
// for _, recovered := range committers {
// found := false
// for _, calculated := range committeeAddrs {
// for _, calculated := range currentRoundCState.Committee() {
// if recovered == calculated {
// found = true
// }
Expand All @@ -885,11 +858,18 @@ func (sb *backend) GetConsensusInfo(block *types.Block) (consensus.ConsensusInfo
// }
// }

// get origin proposer at 0 round.
roundZeroCState, err := sb.GetCommitteeStateByRound(blockNumber, 0)
if err != nil {
logger.Error("Failed to get committee or proposer.", "blockNum", blockNumber, "round", 0, "err", err)
return consensus.ConsensusInfo{}, errInternalError
}

cInfo := consensus.ConsensusInfo{
SigHash: sigHash(block.Header()),
Proposer: proposer,
OriginProposer: originProposer,
Committee: committeeAddrs,
Proposer: currentRoundCState.Proposer(),
OriginProposer: roundZeroCState.Proposer(),
Committee: currentRoundCState.Committee().Copy(),
Committers: committers,
Round: round,
}
Expand All @@ -902,165 +882,6 @@ func (sb *backend) InitSnapshot() {
sb.blsPubkeyProvider.ResetBlsCache()
}

// prepareSnapshotApply is a helper function to prepare snapshot and headers for the given block number and hash.
// It returns the snapshot, headers, and error if any.
func (sb *backend) prepareSnapshotApply(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header) (*Snapshot, []*types.Header, error) {
// Search for a snapshot in memory or on disk for checkpoints
var (
headers []*types.Header
snap *Snapshot
)

for snap == nil {
// If an in-memory snapshot was found, use that
if s, ok := sb.recents.Get(hash); ok {
snap = s.(*Snapshot)
break
}
// If an on-disk checkpoint snapshot can be found, use that
if params.IsCheckpointInterval(number) {
if s, err := loadSnapshot(sb.db, hash); err == nil {
logger.Trace("Loaded voting snapshot form disk", "number", number, "hash", hash)
snap = s
break
}
}
// If we're at block zero, make a snapshot
if number == 0 {
var err error
if snap, err = sb.initSnapshot(chain); err != nil {
return nil, nil, err
}
break
}
// No snapshot for this header, gather the header and move backward
if header := getPrevHeaderAndUpdateParents(chain, number, hash, &parents); header == nil {
return nil, nil, consensus.ErrUnknownAncestor
} else {
headers = append(headers, header)
number, hash = number-1, header.ParentHash
}
}
// Previous snapshot found, apply any pending headers on top of it
for i := 0; i < len(headers)/2; i++ {
headers[i], headers[len(headers)-1-i] = headers[len(headers)-1-i], headers[i]
}

return snap, headers, nil
}

// GetKaiaHeadersForSnapshotApply returns the headers need to be applied to create snapshot for the given block number.
// Note that it only returns headers for kaia fork enabled blocks.
func (sb *backend) GetKaiaHeadersForSnapshotApply(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header) ([]*types.Header, error) {
_, headers, err := sb.prepareSnapshotApply(chain, number, hash, parents)
if err != nil {
return nil, err
}

kaiaHeaders := []*types.Header{}
for i := 0; i < len(headers); i++ {
if chain.Config().IsKaiaForkEnabled(new(big.Int).Add(headers[i].Number, big.NewInt(1))) {
kaiaHeaders = headers[i:]
break
}
}

return kaiaHeaders, nil
}

// snapshot retrieves the state of the authorization voting at a given point in time.
// There's in-memory snapshot and on-disk snapshot. On-disk snapshot is stored every checkpointInterval blocks.
// Moreover, if the block has no in-memory or on-disk snapshot, before generating snapshot, it gathers the header and apply the vote in it.
func (sb *backend) snapshot(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header, writable bool) (*Snapshot, error) {
snap, headers, err := sb.prepareSnapshotApply(chain, number, hash, parents)
if err != nil {
return nil, err
}

pset := sb.govModule.EffectiveParamSet(snap.Number)
snap, err = snap.apply(headers, sb.governance, sb.govModule, sb.address, pset.ProposerPolicy, chain, sb.stakingModule, writable)
if err != nil {
return nil, err
}

// If we've generated a new checkpoint snapshot, save to disk
if writable && params.IsCheckpointInterval(snap.Number) && len(headers) > 0 {
if sb.governance.CanWriteGovernanceState(snap.Number) {
sb.governance.WriteGovernanceState(snap.Number, true)
}
if err = snap.store(sb.db); err != nil {
return nil, err
}
logger.Trace("Stored voting snapshot to disk", "number", snap.Number, "hash", snap.Hash)
}

sb.regen(chain, headers)

sb.recents.Add(snap.Hash, snap)
return snap, err
}

// regen commits snapshot data to database
// regen is triggered if there is any checkpoint block in the `headers`.
// For each checkpoint block, this function verifies the existence of its snapshot in DB and stores one if missing.
/*
Triggered:
| ^ ^ ^ ^ ...|
SI SI*(last snapshot) SI SI
| header1, .. headerN |
Not triggered: (Guaranteed SI* was committed before )
| ^ ^ ^ ^ ...|
SI SI*(last snapshot) SI SI
| header1, .. headerN |
*/
func (sb *backend) regen(chain consensus.ChainReader, headers []*types.Header) {
// Prevent nested call. Ignore header length one
// because it was handled before the `regen` called.
if !sb.isRestoringSnapshots.CompareAndSwap(false, true) || len(headers) <= 1 {
return
}
defer func() {
sb.isRestoringSnapshots.Store(false)
}()

var (
from = headers[0].Number.Uint64()
to = headers[len(headers)-1].Number.Uint64()
start = time.Now()
commitTried = false
)

// Shortcut: No missing snapshot data to be processed.
if to-(to%uint64(params.CheckpointInterval)) < from {
return
}

for _, header := range headers {
var (
hn = header.Number.Uint64()
hh = header.Hash()
)
if params.IsCheckpointInterval(hn) {
// Store snapshot data if it was not committed before
if loadSnap, _ := sb.db.ReadIstanbulSnapshot(hh); loadSnap != nil {
continue
}
snap, err := sb.snapshot(chain, hn, hh, nil, false)
if err != nil {
logger.Warn("[Snapshot] Snapshot restoring failed", "len(headers)", len(headers), "from", from, "to", to, "headerNumber", hn)
continue
}
if err = snap.store(sb.db); err != nil {
logger.Warn("[Snapshot] Snapshot restoring failed", "len(headers)", len(headers), "from", from, "to", to, "headerNumber", hn)
}
commitTried = true
}
}
if commitTried { // This prevents pushing too many logs by potential DoS attack
logger.Trace("[Snapshot] Snapshot restoring completed", "len(headers)", len(headers), "from", from, "to", to, "elapsed", time.Since(start))
}
}

// FIXME: Need to update this for Istanbul
// sigHash returns the hash which is used as input for the Istanbul
// signing. It is the hash of the entire header apart from the 65 byte signature
Expand Down
Loading

0 comments on commit 0a7a3b8

Please sign in to comment.