diff --git a/consensus/istanbul/backend/backend.go b/consensus/istanbul/backend/backend.go index ee7b46cd5..d2e0a424e 100644 --- a/consensus/istanbul/backend/backend.go +++ b/consensus/istanbul/backend/backend.go @@ -131,7 +131,7 @@ type backend struct { // Protects the signer fields candidatesLock sync.RWMutex // Snapshots for recent block to speed up reorgs - recents *lru.ARCCache + recents *lru.ARCCache // TODO-kaiax: Remove snapshot cache // event subscription for ChainHeadEvent event broadcaster consensus.Broadcaster diff --git a/consensus/istanbul/backend/engine.go b/consensus/istanbul/backend/engine.go index f648ba96a..8b1dcb776 100644 --- a/consensus/istanbul/backend/engine.go +++ b/consensus/istanbul/backend/engine.go @@ -823,45 +823,10 @@ func (sb *backend) GetConsensusInfo(block *types.Block) (consensus.ConsensusInfo return consensus.ConsensusInfo{}, nil } - round := block.Header().Round() - view := &istanbul.View{ - Sequence: new(big.Int).Set(block.Number()), - Round: new(big.Int).SetInt64(int64(round)), - } - - // get the proposer of this block. - proposer, err := ecrecover(block.Header()) - if err != nil { - return consensus.ConsensusInfo{}, err - } - if sb.chain == nil { return consensus.ConsensusInfo{}, errNoChainReader } - // get the snapshot of the previous block. - parentHash := block.ParentHash() - snap, err := sb.snapshot(sb.chain, blockNumber-1, parentHash, nil, false) - if err != nil { - logger.Error("Failed to get snapshot.", "blockNum", blockNumber, "err", err) - return consensus.ConsensusInfo{}, errInternalError - } - - // get origin proposer at 0 round. - originProposer := common.Address{} - lastProposer := sb.GetProposer(blockNumber - 1) - - newValSet := snap.ValSet.Copy() - newValSet.CalcProposer(lastProposer, 0) - originProposer = newValSet.GetProposer().Address() - - // get the committee list of this block at the view (blockNumber, round) - committee := snap.ValSet.SubListWithProposer(parentHash, proposer, view) - committeeAddrs := make([]common.Address, len(committee)) - for i, v := range committee { - committeeAddrs[i] = v.Address() - } - // get the committers of this block from committed seals extra, err := types.ExtractIstanbulExtra(block.Header()) if err != nil { @@ -872,10 +837,18 @@ func (sb *backend) GetConsensusInfo(block *types.Block) (consensus.ConsensusInfo return consensus.ConsensusInfo{}, err } + round := block.Header().Round() + + // get the committee list of this block (blockNumber, round) + currentRoundCState, err := sb.GetCommitteeState(block.NumberU64()) + if err != nil { + logger.Error("Failed to get committee or proposer.", "blockNum", blockNumber, "round", uint64(round), "err", err) + return consensus.ConsensusInfo{}, errInternalError + } // Uncomment to validate if committers are in the committee // for _, recovered := range committers { // found := false - // for _, calculated := range committeeAddrs { + // for _, calculated := range currentRoundCState.Committee() { // if recovered == calculated { // found = true // } @@ -885,11 +858,18 @@ func (sb *backend) GetConsensusInfo(block *types.Block) (consensus.ConsensusInfo // } // } + // get origin proposer at 0 round. + roundZeroCState, err := sb.GetCommitteeStateByRound(blockNumber, 0) + if err != nil { + logger.Error("Failed to get committee or proposer.", "blockNum", blockNumber, "round", 0, "err", err) + return consensus.ConsensusInfo{}, errInternalError + } + cInfo := consensus.ConsensusInfo{ SigHash: sigHash(block.Header()), - Proposer: proposer, - OriginProposer: originProposer, - Committee: committeeAddrs, + Proposer: currentRoundCState.Proposer(), + OriginProposer: roundZeroCState.Proposer(), + Committee: currentRoundCState.Committee().Copy(), Committers: committers, Round: round, } @@ -902,165 +882,6 @@ func (sb *backend) InitSnapshot() { sb.blsPubkeyProvider.ResetBlsCache() } -// prepareSnapshotApply is a helper function to prepare snapshot and headers for the given block number and hash. -// It returns the snapshot, headers, and error if any. -func (sb *backend) prepareSnapshotApply(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header) (*Snapshot, []*types.Header, error) { - // Search for a snapshot in memory or on disk for checkpoints - var ( - headers []*types.Header - snap *Snapshot - ) - - for snap == nil { - // If an in-memory snapshot was found, use that - if s, ok := sb.recents.Get(hash); ok { - snap = s.(*Snapshot) - break - } - // If an on-disk checkpoint snapshot can be found, use that - if params.IsCheckpointInterval(number) { - if s, err := loadSnapshot(sb.db, hash); err == nil { - logger.Trace("Loaded voting snapshot form disk", "number", number, "hash", hash) - snap = s - break - } - } - // If we're at block zero, make a snapshot - if number == 0 { - var err error - if snap, err = sb.initSnapshot(chain); err != nil { - return nil, nil, err - } - break - } - // No snapshot for this header, gather the header and move backward - if header := getPrevHeaderAndUpdateParents(chain, number, hash, &parents); header == nil { - return nil, nil, consensus.ErrUnknownAncestor - } else { - headers = append(headers, header) - number, hash = number-1, header.ParentHash - } - } - // Previous snapshot found, apply any pending headers on top of it - for i := 0; i < len(headers)/2; i++ { - headers[i], headers[len(headers)-1-i] = headers[len(headers)-1-i], headers[i] - } - - return snap, headers, nil -} - -// GetKaiaHeadersForSnapshotApply returns the headers need to be applied to create snapshot for the given block number. -// Note that it only returns headers for kaia fork enabled blocks. -func (sb *backend) GetKaiaHeadersForSnapshotApply(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header) ([]*types.Header, error) { - _, headers, err := sb.prepareSnapshotApply(chain, number, hash, parents) - if err != nil { - return nil, err - } - - kaiaHeaders := []*types.Header{} - for i := 0; i < len(headers); i++ { - if chain.Config().IsKaiaForkEnabled(new(big.Int).Add(headers[i].Number, big.NewInt(1))) { - kaiaHeaders = headers[i:] - break - } - } - - return kaiaHeaders, nil -} - -// snapshot retrieves the state of the authorization voting at a given point in time. -// There's in-memory snapshot and on-disk snapshot. On-disk snapshot is stored every checkpointInterval blocks. -// Moreover, if the block has no in-memory or on-disk snapshot, before generating snapshot, it gathers the header and apply the vote in it. -func (sb *backend) snapshot(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header, writable bool) (*Snapshot, error) { - snap, headers, err := sb.prepareSnapshotApply(chain, number, hash, parents) - if err != nil { - return nil, err - } - - pset := sb.govModule.EffectiveParamSet(snap.Number) - snap, err = snap.apply(headers, sb.governance, sb.govModule, sb.address, pset.ProposerPolicy, chain, sb.stakingModule, writable) - if err != nil { - return nil, err - } - - // If we've generated a new checkpoint snapshot, save to disk - if writable && params.IsCheckpointInterval(snap.Number) && len(headers) > 0 { - if sb.governance.CanWriteGovernanceState(snap.Number) { - sb.governance.WriteGovernanceState(snap.Number, true) - } - if err = snap.store(sb.db); err != nil { - return nil, err - } - logger.Trace("Stored voting snapshot to disk", "number", snap.Number, "hash", snap.Hash) - } - - sb.regen(chain, headers) - - sb.recents.Add(snap.Hash, snap) - return snap, err -} - -// regen commits snapshot data to database -// regen is triggered if there is any checkpoint block in the `headers`. -// For each checkpoint block, this function verifies the existence of its snapshot in DB and stores one if missing. -/* - Triggered: - | ^ ^ ^ ^ ...| - SI SI*(last snapshot) SI SI - | header1, .. headerN | - Not triggered: (Guaranteed SI* was committed before ) - | ^ ^ ^ ^ ...| - SI SI*(last snapshot) SI SI - | header1, .. headerN | -*/ -func (sb *backend) regen(chain consensus.ChainReader, headers []*types.Header) { - // Prevent nested call. Ignore header length one - // because it was handled before the `regen` called. - if !sb.isRestoringSnapshots.CompareAndSwap(false, true) || len(headers) <= 1 { - return - } - defer func() { - sb.isRestoringSnapshots.Store(false) - }() - - var ( - from = headers[0].Number.Uint64() - to = headers[len(headers)-1].Number.Uint64() - start = time.Now() - commitTried = false - ) - - // Shortcut: No missing snapshot data to be processed. - if to-(to%uint64(params.CheckpointInterval)) < from { - return - } - - for _, header := range headers { - var ( - hn = header.Number.Uint64() - hh = header.Hash() - ) - if params.IsCheckpointInterval(hn) { - // Store snapshot data if it was not committed before - if loadSnap, _ := sb.db.ReadIstanbulSnapshot(hh); loadSnap != nil { - continue - } - snap, err := sb.snapshot(chain, hn, hh, nil, false) - if err != nil { - logger.Warn("[Snapshot] Snapshot restoring failed", "len(headers)", len(headers), "from", from, "to", to, "headerNumber", hn) - continue - } - if err = snap.store(sb.db); err != nil { - logger.Warn("[Snapshot] Snapshot restoring failed", "len(headers)", len(headers), "from", from, "to", to, "headerNumber", hn) - } - commitTried = true - } - } - if commitTried { // This prevents pushing too many logs by potential DoS attack - logger.Trace("[Snapshot] Snapshot restoring completed", "len(headers)", len(headers), "from", from, "to", to, "elapsed", time.Since(start)) - } -} - // FIXME: Need to update this for Istanbul // sigHash returns the hash which is used as input for the Istanbul // signing. It is the hash of the entire header apart from the 65 byte signature diff --git a/consensus/istanbul/backend/snapshot.go b/consensus/istanbul/backend/snapshot.go index bbe3f118f..612f77316 100644 --- a/consensus/istanbul/backend/snapshot.go +++ b/consensus/istanbul/backend/snapshot.go @@ -26,6 +26,7 @@ import ( "bytes" "encoding/json" "math/big" + "time" "github.com/kaiachain/kaia/blockchain/types" "github.com/kaiachain/kaia/common" @@ -379,3 +380,162 @@ func (s *Snapshot) MarshalJSON() ([]byte, error) { j := s.toJSONStruct() return json.Marshal(j) } + +// prepareSnapshotApply is a helper function to prepare snapshot and headers for the given block number and hash. +// It returns the snapshot, headers, and error if any. +func (sb *backend) prepareSnapshotApply(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header) (*Snapshot, []*types.Header, error) { + // Search for a snapshot in memory or on disk for checkpoints + var ( + headers []*types.Header + snap *Snapshot + ) + + for snap == nil { + // If an in-memory snapshot was found, use that + if s, ok := sb.recents.Get(hash); ok { + snap = s.(*Snapshot) + break + } + // If an on-disk checkpoint snapshot can be found, use that + if params.IsCheckpointInterval(number) { + if s, err := loadSnapshot(sb.db, hash); err == nil { + logger.Trace("Loaded voting snapshot form disk", "number", number, "hash", hash) + snap = s + break + } + } + // If we're at block zero, make a snapshot + if number == 0 { + var err error + if snap, err = sb.initSnapshot(chain); err != nil { + return nil, nil, err + } + break + } + // No snapshot for this header, gather the header and move backward + if header := getPrevHeaderAndUpdateParents(chain, number, hash, &parents); header == nil { + return nil, nil, consensus.ErrUnknownAncestor + } else { + headers = append(headers, header) + number, hash = number-1, header.ParentHash + } + } + // Previous snapshot found, apply any pending headers on top of it + for i := 0; i < len(headers)/2; i++ { + headers[i], headers[len(headers)-1-i] = headers[len(headers)-1-i], headers[i] + } + + return snap, headers, nil +} + +// GetKaiaHeadersForSnapshotApply returns the headers need to be applied to create snapshot for the given block number. +// Note that it only returns headers for kaia fork enabled blocks. +func (sb *backend) GetKaiaHeadersForSnapshotApply(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header) ([]*types.Header, error) { + _, headers, err := sb.prepareSnapshotApply(chain, number, hash, parents) + if err != nil { + return nil, err + } + + kaiaHeaders := []*types.Header{} + for i := 0; i < len(headers); i++ { + if chain.Config().IsKaiaForkEnabled(new(big.Int).Add(headers[i].Number, big.NewInt(1))) { + kaiaHeaders = headers[i:] + break + } + } + + return kaiaHeaders, nil +} + +// snapshot retrieves the state of the authorization voting at a given point in time. +// There's in-memory snapshot and on-disk snapshot. On-disk snapshot is stored every checkpointInterval blocks. +// Moreover, if the block has no in-memory or on-disk snapshot, before generating snapshot, it gathers the header and apply the vote in it. +func (sb *backend) snapshot(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header, writable bool) (*Snapshot, error) { + snap, headers, err := sb.prepareSnapshotApply(chain, number, hash, parents) + if err != nil { + return nil, err + } + + pset := sb.govModule.EffectiveParamSet(snap.Number) + snap, err = snap.apply(headers, sb.governance, sb.govModule, sb.address, pset.ProposerPolicy, chain, sb.stakingModule, writable) + if err != nil { + return nil, err + } + + // If we've generated a new checkpoint snapshot, save to disk + if writable && params.IsCheckpointInterval(snap.Number) && len(headers) > 0 { + if sb.governance.CanWriteGovernanceState(snap.Number) { + sb.governance.WriteGovernanceState(snap.Number, true) + } + if err = snap.store(sb.db); err != nil { + return nil, err + } + logger.Trace("Stored voting snapshot to disk", "number", snap.Number, "hash", snap.Hash) + } + + sb.regen(chain, headers) + + sb.recents.Add(snap.Hash, snap) + return snap, err +} + +// regen commits snapshot data to database +// regen is triggered if there is any checkpoint block in the `headers`. +// For each checkpoint block, this function verifies the existence of its snapshot in DB and stores one if missing. +/* + Triggered: + | ^ ^ ^ ^ ...| + SI SI*(last snapshot) SI SI + | header1, .. headerN | + Not triggered: (Guaranteed SI* was committed before ) + | ^ ^ ^ ^ ...| + SI SI*(last snapshot) SI SI + | header1, .. headerN | +*/ +func (sb *backend) regen(chain consensus.ChainReader, headers []*types.Header) { + // Prevent nested call. Ignore header length one + // because it was handled before the `regen` called. + if !sb.isRestoringSnapshots.CompareAndSwap(false, true) || len(headers) <= 1 { + return + } + defer func() { + sb.isRestoringSnapshots.Store(false) + }() + + var ( + from = headers[0].Number.Uint64() + to = headers[len(headers)-1].Number.Uint64() + start = time.Now() + commitTried = false + ) + + // Shortcut: No missing snapshot data to be processed. + if to-(to%uint64(params.CheckpointInterval)) < from { + return + } + + for _, header := range headers { + var ( + hn = header.Number.Uint64() + hh = header.Hash() + ) + if params.IsCheckpointInterval(hn) { + // Store snapshot data if it was not committed before + if loadSnap, _ := sb.db.ReadIstanbulSnapshot(hh); loadSnap != nil { + continue + } + snap, err := sb.snapshot(chain, hn, hh, nil, false) + if err != nil { + logger.Warn("[Snapshot] Snapshot restoring failed", "len(headers)", len(headers), "from", from, "to", to, "headerNumber", hn) + continue + } + if err = snap.store(sb.db); err != nil { + logger.Warn("[Snapshot] Snapshot restoring failed", "len(headers)", len(headers), "from", from, "to", to, "headerNumber", hn) + } + commitTried = true + } + } + if commitTried { // This prevents pushing too many logs by potential DoS attack + logger.Trace("[Snapshot] Snapshot restoring completed", "len(headers)", len(headers), "from", from, "to", to, "elapsed", time.Since(start)) + } +} \ No newline at end of file