Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support auto recover when pbss meet unclean shutdown #125

Merged
merged 3 commits into from
Jul 11, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: support auto recover when pbss meet unclean shutdown
krish-nr committed Jul 7, 2024
commit 345b0a27edff6132d10e850a9e65ec1939fa10ac
4 changes: 4 additions & 0 deletions beacon/engine/errors.go
Original file line number Diff line number Diff line change
@@ -74,6 +74,10 @@ var (
// - newPayloadV1: if the payload was accepted, but not processed (side chain)
ACCEPTED = "ACCEPTED"

// INCONSISTENT is returned by the engine API in the following calls:
// - newPayloadV1: if the payload block exists, but state missing
INCONSISTENT = "INCONSISTENT"

GenericServerError = &EngineAPIError{code: -32000, msg: "Server error"}
UnknownPayload = &EngineAPIError{code: -38001, msg: "Unknown payload"}
InvalidForkChoiceState = &EngineAPIError{code: -38002, msg: "Invalid forkchoice state"}
27 changes: 27 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
@@ -2732,3 +2732,30 @@ func (bc *BlockChain) GetTrieFlushInterval() time.Duration {
func (bc *BlockChain) NoTries() bool {
return bc.stateCache.NoTries()
}

func createDelFn(bc *BlockChain) func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
return func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
// Ignore the error here since light client won't hit this path
frozen, _ := bc.db.Ancients()
if num+1 <= frozen {
log.Info("process data in freeze table")
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if _, err := bc.db.TruncateHead(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
rawdb.DeleteHeaderNumber(db, hash)
} else {
// Remove relative body and receipts from the active store.
// The header, total difficulty and canonical hash will be
// removed in the hc.SetHead function.
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteReceipts(db, hash, num)
}
}
}

func (bc *BlockChain) HeaderChainForceSetHead(headNumber uint64) {
bc.hc.SetHead(headNumber, nil, createDelFn(bc))
}
35 changes: 28 additions & 7 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
@@ -40,9 +40,9 @@ import (

var (
forkchoiceUpdateAttributesTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/attributes", nil)
forkchoiceUpdateHeadsTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/heads", nil)
getPayloadTimer = metrics.NewRegisteredTimer("api/engine/get/payload", nil)
newPayloadTimer = metrics.NewRegisteredTimer("api/engine/new/payload", nil)
forkchoiceUpdateHeadsTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/heads", nil)
getPayloadTimer = metrics.NewRegisteredTimer("api/engine/get/payload", nil)
newPayloadTimer = metrics.NewRegisteredTimer("api/engine/new/payload", nil)
)

// Register adds the engine API to the full node.
@@ -367,8 +367,20 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
log.Warn("Safe block not in canonical chain")
return engine.STATUS_INVALID, engine.InvalidForkChoiceState.With(errors.New("safe block not in canonical chain"))
}
//reset safe
currentSafe := api.eth.BlockChain().CurrentSafeBlock()
currentHead := api.eth.BlockChain().CurrentBlock()

// Set the safe block
api.eth.BlockChain().SetSafe(safeBlock.Header())

if shouldDeleteData(currentSafe, currentHead, safeBlock) {
log.Warn("deleting data beyond safe")
api.eth.BlockChain().HeaderChainForceSetHead(currentHead.Number.Uint64())
api.eth.BlockChain().SetFinalized(currentHead)
api.eth.BlockChain().SetSafe(currentHead)
}

}
// If payload generation was requested, create a new block to be potentially
// sealed by the beacon client. The payload will be requested later, and we
@@ -468,10 +480,10 @@ func (api *ConsensusAPI) GetPayloadV3(payloadID engine.PayloadID) (*engine.Execu

func (api *ConsensusAPI) getPayload(payloadID engine.PayloadID, full bool) (*engine.ExecutionPayloadEnvelope, error) {
start := time.Now()
defer func () {
defer func() {
getPayloadTimer.UpdateSince(start)
log.Debug("getPayloadTimer", "duration", common.PrettyDuration(time.Since(start)), "id", payloadID)
} ()
}()
log.Trace("Engine API request received", "method", "GetPayload", "id", payloadID)
data := api.localBlocks.get(payloadID, full)
if data == nil {
@@ -527,10 +539,10 @@ func (api *ConsensusAPI) NewPayloadV3(params engine.ExecutableData, versionedHas

func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash) (engine.PayloadStatusV1, error) {
start := time.Now()
defer func () {
defer func() {
newPayloadTimer.UpdateSince(start)
log.Debug("newPayloadTimer", "duration", common.PrettyDuration(time.Since(start)), "parentHash", params.ParentHash)
} ()
}()

// The locking here is, strictly, not required. Without these locks, this can happen:
//
@@ -608,6 +620,11 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
}
if !api.eth.BlockChain().HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
api.remoteBlocks.put(block.Hash(), block.Header())

if api.eth.BlockChain().TrieDB().Scheme() == rawdb.PathScheme {
log.Warn("State not available, missing trie node", "block", block.ParentHash().String())
return engine.PayloadStatusV1{Status: engine.INCONSISTENT}, nil
}
log.Warn("State not available, ignoring new payload")
return engine.PayloadStatusV1{Status: engine.ACCEPTED}, nil
}
@@ -877,3 +894,7 @@ func getBody(block *types.Block) *engine.ExecutionPayloadBodyV1 {
Withdrawals: withdrawals,
}
}

func shouldDeleteData(currentSafe *types.Header, currentHead *types.Header, safeBlock *types.Block) bool {
return currentSafe.Number.Uint64() > currentHead.Number.Uint64() && currentSafe.Number.Uint64() > safeBlock.NumberU64()
owen-reorg marked this conversation as resolved.
Show resolved Hide resolved
}