Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support auto recover when pbss meet unclean shutdown #125

Merged
merged 3 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions beacon/engine/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ var (
// - newPayloadV1: if the payload was accepted, but not processed (side chain)
ACCEPTED = "ACCEPTED"

// INCONSISTENT is returned by the engine API in the following calls:
// - newPayloadV1: if the payload block exists, but state missing
INCONSISTENT = "INCONSISTENT"

GenericServerError = &EngineAPIError{code: -32000, msg: "Server error"}
UnknownPayload = &EngineAPIError{code: -38001, msg: "Unknown payload"}
InvalidForkChoiceState = &EngineAPIError{code: -38002, msg: "Invalid forkchoice state"}
Expand Down
27 changes: 27 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2732,3 +2732,30 @@ func (bc *BlockChain) GetTrieFlushInterval() time.Duration {
func (bc *BlockChain) NoTries() bool {
return bc.stateCache.NoTries()
}

func createDelFn(bc *BlockChain) func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
return func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
// Ignore the error here since light client won't hit this path
frozen, _ := bc.db.Ancients()
if num+1 <= frozen {
log.Info("process data in freeze table")
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if _, err := bc.db.TruncateHead(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
rawdb.DeleteHeaderNumber(db, hash)
} else {
// Remove relative body and receipts from the active store.
// The header, total difficulty and canonical hash will be
// removed in the hc.SetHead function.
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteReceipts(db, hash, num)
}
}
}

func (bc *BlockChain) HeaderChainForceSetHead(headNumber uint64) {
bc.hc.SetHead(headNumber, nil, createDelFn(bc))
}
35 changes: 28 additions & 7 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ import (

var (
forkchoiceUpdateAttributesTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/attributes", nil)
forkchoiceUpdateHeadsTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/heads", nil)
getPayloadTimer = metrics.NewRegisteredTimer("api/engine/get/payload", nil)
newPayloadTimer = metrics.NewRegisteredTimer("api/engine/new/payload", nil)
forkchoiceUpdateHeadsTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/heads", nil)
getPayloadTimer = metrics.NewRegisteredTimer("api/engine/get/payload", nil)
newPayloadTimer = metrics.NewRegisteredTimer("api/engine/new/payload", nil)
)

// Register adds the engine API to the full node.
Expand Down Expand Up @@ -367,8 +367,20 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
log.Warn("Safe block not in canonical chain")
return engine.STATUS_INVALID, engine.InvalidForkChoiceState.With(errors.New("safe block not in canonical chain"))
}
//reset safe
currentSafe := api.eth.BlockChain().CurrentSafeBlock()
currentHead := api.eth.BlockChain().CurrentBlock()

// Set the safe block
api.eth.BlockChain().SetSafe(safeBlock.Header())

if shouldDeleteData(currentSafe, currentHead, safeBlock) {
log.Warn("deleting data beyond safe")
api.eth.BlockChain().HeaderChainForceSetHead(currentHead.Number.Uint64())
api.eth.BlockChain().SetFinalized(currentHead)
api.eth.BlockChain().SetSafe(currentHead)
}

}
// If payload generation was requested, create a new block to be potentially
// sealed by the beacon client. The payload will be requested later, and we
Expand Down Expand Up @@ -468,10 +480,10 @@ func (api *ConsensusAPI) GetPayloadV3(payloadID engine.PayloadID) (*engine.Execu

func (api *ConsensusAPI) getPayload(payloadID engine.PayloadID, full bool) (*engine.ExecutionPayloadEnvelope, error) {
start := time.Now()
defer func () {
defer func() {
getPayloadTimer.UpdateSince(start)
log.Debug("getPayloadTimer", "duration", common.PrettyDuration(time.Since(start)), "id", payloadID)
} ()
}()
log.Trace("Engine API request received", "method", "GetPayload", "id", payloadID)
data := api.localBlocks.get(payloadID, full)
if data == nil {
Expand Down Expand Up @@ -527,10 +539,10 @@ func (api *ConsensusAPI) NewPayloadV3(params engine.ExecutableData, versionedHas

func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash) (engine.PayloadStatusV1, error) {
start := time.Now()
defer func () {
defer func() {
newPayloadTimer.UpdateSince(start)
log.Debug("newPayloadTimer", "duration", common.PrettyDuration(time.Since(start)), "parentHash", params.ParentHash)
} ()
}()

// The locking here is, strictly, not required. Without these locks, this can happen:
//
Expand Down Expand Up @@ -608,6 +620,11 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
}
if !api.eth.BlockChain().HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
api.remoteBlocks.put(block.Hash(), block.Header())

if api.eth.BlockChain().TrieDB().Scheme() == rawdb.PathScheme {
log.Warn("State not available, missing trie node", "block", block.ParentHash().String())
return engine.PayloadStatusV1{Status: engine.INCONSISTENT}, nil
}
log.Warn("State not available, ignoring new payload")
return engine.PayloadStatusV1{Status: engine.ACCEPTED}, nil
}
Expand Down Expand Up @@ -877,3 +894,7 @@ func getBody(block *types.Block) *engine.ExecutionPayloadBodyV1 {
Withdrawals: withdrawals,
}
}

func shouldDeleteData(currentSafe *types.Header, currentHead *types.Header, safeBlock *types.Block) bool {
return currentSafe != nil && currentSafe.Number.Uint64() > currentHead.Number.Uint64() && currentSafe.Number.Uint64() > safeBlock.NumberU64()
}
Loading