Skip to content

Commit

Permalink
feat: support auto recover when pbss meet unclean shutdown (#125)
Browse files Browse the repository at this point in the history
Co-authored-by: Owen <[email protected]>
  • Loading branch information
krish-nr and owen-reorg authored Jul 11, 2024
1 parent aead14e commit 887404f
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 7 deletions.
4 changes: 4 additions & 0 deletions beacon/engine/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ var (
// - newPayloadV1: if the payload was accepted, but not processed (side chain)
ACCEPTED = "ACCEPTED"

// INCONSISTENT is returned by the engine API in the following calls:
// - newPayloadV1: if the payload block exists, but state missing
INCONSISTENT = "INCONSISTENT"

GenericServerError = &EngineAPIError{code: -32000, msg: "Server error"}
UnknownPayload = &EngineAPIError{code: -38001, msg: "Unknown payload"}
InvalidForkChoiceState = &EngineAPIError{code: -38002, msg: "Invalid forkchoice state"}
Expand Down
27 changes: 27 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2732,3 +2732,30 @@ func (bc *BlockChain) GetTrieFlushInterval() time.Duration {
func (bc *BlockChain) NoTries() bool {
return bc.stateCache.NoTries()
}

func createDelFn(bc *BlockChain) func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
return func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
// Ignore the error here since light client won't hit this path
frozen, _ := bc.db.Ancients()
if num+1 <= frozen {
log.Info("process data in freeze table")
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if _, err := bc.db.TruncateHead(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
rawdb.DeleteHeaderNumber(db, hash)
} else {
// Remove relative body and receipts from the active store.
// The header, total difficulty and canonical hash will be
// removed in the hc.SetHead function.
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteReceipts(db, hash, num)
}
}
}

func (bc *BlockChain) HeaderChainForceSetHead(headNumber uint64) {
bc.hc.SetHead(headNumber, nil, createDelFn(bc))
}
35 changes: 28 additions & 7 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ import (

var (
forkchoiceUpdateAttributesTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/attributes", nil)
forkchoiceUpdateHeadsTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/heads", nil)
getPayloadTimer = metrics.NewRegisteredTimer("api/engine/get/payload", nil)
newPayloadTimer = metrics.NewRegisteredTimer("api/engine/new/payload", nil)
forkchoiceUpdateHeadsTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/heads", nil)
getPayloadTimer = metrics.NewRegisteredTimer("api/engine/get/payload", nil)
newPayloadTimer = metrics.NewRegisteredTimer("api/engine/new/payload", nil)
)

// Register adds the engine API to the full node.
Expand Down Expand Up @@ -367,8 +367,20 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
log.Warn("Safe block not in canonical chain")
return engine.STATUS_INVALID, engine.InvalidForkChoiceState.With(errors.New("safe block not in canonical chain"))
}
//reset safe
currentSafe := api.eth.BlockChain().CurrentSafeBlock()
currentHead := api.eth.BlockChain().CurrentBlock()

// Set the safe block
api.eth.BlockChain().SetSafe(safeBlock.Header())

if shouldDeleteData(currentSafe, currentHead, safeBlock) {
log.Warn("deleting data beyond safe")
api.eth.BlockChain().HeaderChainForceSetHead(currentHead.Number.Uint64())
api.eth.BlockChain().SetFinalized(currentHead)
api.eth.BlockChain().SetSafe(currentHead)
}

}
// If payload generation was requested, create a new block to be potentially
// sealed by the beacon client. The payload will be requested later, and we
Expand Down Expand Up @@ -468,10 +480,10 @@ func (api *ConsensusAPI) GetPayloadV3(payloadID engine.PayloadID) (*engine.Execu

func (api *ConsensusAPI) getPayload(payloadID engine.PayloadID, full bool) (*engine.ExecutionPayloadEnvelope, error) {
start := time.Now()
defer func () {
defer func() {
getPayloadTimer.UpdateSince(start)
log.Debug("getPayloadTimer", "duration", common.PrettyDuration(time.Since(start)), "id", payloadID)
} ()
}()
log.Trace("Engine API request received", "method", "GetPayload", "id", payloadID)
data := api.localBlocks.get(payloadID, full)
if data == nil {
Expand Down Expand Up @@ -527,10 +539,10 @@ func (api *ConsensusAPI) NewPayloadV3(params engine.ExecutableData, versionedHas

func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash) (engine.PayloadStatusV1, error) {
start := time.Now()
defer func () {
defer func() {
newPayloadTimer.UpdateSince(start)
log.Debug("newPayloadTimer", "duration", common.PrettyDuration(time.Since(start)), "parentHash", params.ParentHash)
} ()
}()

// The locking here is, strictly, not required. Without these locks, this can happen:
//
Expand Down Expand Up @@ -608,6 +620,11 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
}
if !api.eth.BlockChain().HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
api.remoteBlocks.put(block.Hash(), block.Header())

if api.eth.BlockChain().TrieDB().Scheme() == rawdb.PathScheme {
log.Warn("State not available, missing trie node", "block", block.ParentHash().String())
return engine.PayloadStatusV1{Status: engine.INCONSISTENT}, nil
}
log.Warn("State not available, ignoring new payload")
return engine.PayloadStatusV1{Status: engine.ACCEPTED}, nil
}
Expand Down Expand Up @@ -877,3 +894,7 @@ func getBody(block *types.Block) *engine.ExecutionPayloadBodyV1 {
Withdrawals: withdrawals,
}
}

func shouldDeleteData(currentSafe *types.Header, currentHead *types.Header, safeBlock *types.Block) bool {
return currentSafe != nil && currentSafe.Number.Uint64() > currentHead.Number.Uint64() && currentSafe.Number.Uint64() > safeBlock.NumberU64()
}

0 comments on commit 887404f

Please sign in to comment.