Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4844: bugfix and improve #2337

Merged
merged 15 commits into from
Mar 28, 2024
14 changes: 5 additions & 9 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,10 +701,8 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
}
}

// If we're at the genesis, snapshot the initial state. Alternatively if we have
// piled up more headers than allowed to be reorged (chain reinit from a freezer),
// consider the checkpoint trusted and snapshot it.
if number == 0 || (number%p.config.Epoch == 0 && (len(headers) > int(params.FullImmutabilityThreshold)/10)) {
buddh0 marked this conversation as resolved.
Show resolved Hide resolved
// If we're at the genesis, snapshot the initial state.
if number == 0 {
buddh0 marked this conversation as resolved.
Show resolved Hide resolved
checkpoint := chain.GetHeaderByNumber(number)
if checkpoint != nil {
// get checkpoint data
Expand All @@ -718,12 +716,10 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash

// new snapshot
snap = newSnapshot(p.config, p.signatures, number, hash, validators, voteAddrs, p.ethAPI)
if snap.Number%checkpointInterval == 0 { // snapshot will only be loaded when snap.Number%checkpointInterval == 0
if err := snap.store(p.db); err != nil {
return nil, err
}
log.Info("Stored checkpoint snapshot to disk", "number", number, "hash", hash)
if err := snap.store(p.db); err != nil {
return nil, err
}
log.Info("Stored checkpoint snapshot to disk", "number", number, "hash", hash)
break
}
}
Expand Down
5 changes: 3 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
// The header, total difficulty and canonical hash will be
// removed in the hc.SetHead function.
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteBlobSidecars(db, hash, num)
galaio marked this conversation as resolved.
Show resolved Hide resolved
rawdb.DeleteReceipts(db, hash, num)
}
// Todo(rjl493456442) txlookup, bloombits, etc
Expand Down Expand Up @@ -1340,6 +1341,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
lastBlk := blockChain[len(blockChain)-1]
if bc.chainConfig.Parlia != nil && bc.chainConfig.IsCancun(lastBlk.Number(), lastBlk.Time()) {
if _, err := CheckDataAvailableInBatch(bc, blockChain); err != nil {
log.Debug("CheckDataAvailableInBatch", "err", err)
return 0, err
}
}
Expand Down Expand Up @@ -1404,8 +1406,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [

// Write all chain data to ancients.
td := bc.GetTd(first.Hash(), first.NumberU64())
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)

writeSize, err := rawdb.WriteAncientBlocksWithBlobs(bc.db, blockChain, receiptChain, td)
if err != nil {
log.Error("Error importing chain data to ancients", "err", err)
return 0, err
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (bc *BlockChain) GetSidecarsByHash(hash common.Hash) types.BlobSidecars {
if number == nil {
return nil
}
sidecars := rawdb.ReadRawBlobSidecars(bc.db, hash, *number)
sidecars := rawdb.ReadBlobSidecars(bc.db, hash, *number)
if sidecars == nil {
return nil
}
Expand Down
15 changes: 6 additions & 9 deletions core/data_availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)

Expand Down Expand Up @@ -49,13 +48,10 @@ func validateBlobSidecar(hashes []common.Hash, sidecar *types.BlobSidecar) error
func IsDataAvailable(chain consensus.ChainHeaderReader, block *types.Block) (err error) {
// refer logic in ValidateBody
if !chain.Config().IsCancun(block.Number(), block.Time()) {
if block.Sidecars() == nil {
return nil
} else {
if block.Sidecars() != nil {
return errors.New("sidecars present in block body before cancun")
}
} else if block.Sidecars() == nil {
return errors.New("missing sidecars in block body after cancun")
return nil
}

// only required to check within MinBlocksForBlobRequests block's DA
Expand All @@ -64,15 +60,16 @@ func IsDataAvailable(chain consensus.ChainHeaderReader, block *types.Block) (err
if highest == nil || highest.Number.Cmp(current.Number) < 0 {
highest = current
}
defer func() {
log.Info("IsDataAvailable", "block", block.Number(), "hash", block.Hash(), "highest", highest.Number, "sidecars", len(block.Sidecars()), "err", err)
buddh0 marked this conversation as resolved.
Show resolved Hide resolved
}()
if block.NumberU64()+params.MinBlocksForBlobRequests < highest.Number.Uint64() {
// if we needn't check DA of this block, just clean it
block.CleanSidecars()
return nil
}

// if sidecar is nil, just clean it. And it will be used for saving in ancient.
if block.Sidecars() == nil {
block.CleanSidecars()
}
sidecars := block.Sidecars()
for _, s := range sidecars {
if err := s.SanityCheck(block.Number(), block.Hash()); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/data_availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestIsDataAvailable(t *testing.T) {
}, nil),
chasingHead: params.MinBlocksForBlobRequests + 1,
withSidecar: false,
err: true,
err: false,
},
}

Expand Down
38 changes: 33 additions & 5 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,8 +798,9 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) {
WriteHeader(db, block.Header())
}

// WriteAncientBlocks writes entire block data into ancient store and returns the total written size.
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
// WriteAncientBlocksWithBlobs writes entire block data with blobs into ancient store and returns the total written size.
func WriteAncientBlocksWithBlobs(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
// find cancun index, it's used for new added blob ancient table
cancunIndex := -1
for i, block := range blocks {
if block.Sidecars() != nil {
Expand All @@ -808,12 +809,39 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts
}
}
log.Info("WriteAncientBlocks", "startAt", blocks[0].Number(), "cancunIndex", cancunIndex, "len", len(blocks))

var (
tdSum = new(big.Int).Set(td)
preSize int64
err error
)
if cancunIndex > 0 {
preSize, err = WriteAncientBlocks(db, blocks[:cancunIndex], receipts[:cancunIndex], td)
if err != nil {
return preSize, err
}
for i, block := range blocks[:cancunIndex] {
if i > 0 {
tdSum.Add(tdSum, block.Difficulty())
}
}
tdSum.Add(tdSum, blocks[cancunIndex].Difficulty())
}

// It will reset blob ancient table at cancunIndex
if cancunIndex >= 0 {
if err := ResetEmptyBlobAncientTable(db, blocks[cancunIndex].NumberU64()); err != nil {
if err = ResetEmptyBlobAncientTable(db, blocks[cancunIndex].NumberU64()); err != nil {
return 0, err
}
blocks = blocks[cancunIndex:]
receipts = receipts[cancunIndex:]
}
postSize, err := WriteAncientBlocks(db, blocks, receipts, tdSum)
return preSize + postSize, err
}

// WriteAncientBlocks writes entire block data into ancient store and returns the total written size.
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
var (
tdSum = new(big.Int).Set(td)
stReceipts []*types.ReceiptForStorage
Expand Down Expand Up @@ -853,8 +881,8 @@ func ReadBlobSidecarsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.R
return data
}

// ReadRawBlobSidecars retrieves all the transaction blobs belonging to a block.
func ReadRawBlobSidecars(db ethdb.Reader, hash common.Hash, number uint64) types.BlobSidecars {
// ReadBlobSidecars retrieves all the transaction blobs belonging to a block.
func ReadBlobSidecars(db ethdb.Reader, hash common.Hash, number uint64) types.BlobSidecars {
galaio marked this conversation as resolved.
Show resolved Hide resolved
data := ReadBlobSidecarsRLP(db, hash, number)
if len(data) == 0 {
return nil
Expand Down
6 changes: 3 additions & 3 deletions core/rawdb/accessors_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,13 +455,13 @@ func TestBlockBlobSidecarsStorage(t *testing.T) {
sidecars := types.BlobSidecars{types.NewBlobSidecarFromTx(tx1)}

// Check that no sidecars entries are in a pristine database
if bs := ReadRawBlobSidecars(db, blkHash, 0); len(bs) != 0 {
if bs := ReadBlobSidecars(db, blkHash, 0); len(bs) != 0 {
t.Fatalf("non existent sidecars returned: %v", bs)
}
WriteBody(db, blkHash, 0, body)
WriteBlobSidecars(db, blkHash, 0, sidecars)

if bs := ReadRawBlobSidecars(db, blkHash, 0); len(bs) == 0 {
if bs := ReadBlobSidecars(db, blkHash, 0); len(bs) == 0 {
t.Fatalf("no sidecars returned")
} else {
if err := checkBlobSidecarsRLP(bs, sidecars); err != nil {
Expand All @@ -470,7 +470,7 @@ func TestBlockBlobSidecarsStorage(t *testing.T) {
}

DeleteBlobSidecars(db, blkHash, 0)
if bs := ReadRawBlobSidecars(db, blkHash, 0); len(bs) != 0 {
if bs := ReadBlobSidecars(db, blkHash, 0); len(bs) != 0 {
t.Fatalf("deleted sidecars returned: %v", bs)
}
}
Expand Down
22 changes: 6 additions & 16 deletions core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
env, _ := f.freezeEnv.Load().(*ethdb.FreezerEnv)
// try prune blob data after cancun fork
if isCancun(env, head.Number, head.Time) {
f.tryPruneBlobAncient(env, *number)
f.tryPruneBlobAncientTable(env, *number)
}

// Avoid database thrashing with tiny writes
Expand All @@ -262,7 +262,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
}
}

func (f *chainFreezer) tryPruneBlobAncient(env *ethdb.FreezerEnv, num uint64) {
func (f *chainFreezer) tryPruneBlobAncientTable(env *ethdb.FreezerEnv, num uint64) {
extraReserve := getBlobExtraReserveFromEnv(env)
// It means that there is no need for pruning
if extraReserve == 0 {
Expand All @@ -273,13 +273,8 @@ func (f *chainFreezer) tryPruneBlobAncient(env *ethdb.FreezerEnv, num uint64) {
return
}
expectTail := num - reserveThreshold
h, err := f.TableAncients(ChainFreezerBlobSidecarTable)
if err != nil {
log.Error("Cannot get blob ancient head when prune", "block", num)
return
}
start := time.Now()
if err = f.ResetTable(ChainFreezerBlobSidecarTable, expectTail, h, false); err != nil {
if _, err := f.TruncateTableTail(ChainFreezerBlobSidecarTable, expectTail); err != nil {
log.Error("Cannot prune blob ancient", "block", num, "expectTail", expectTail, "err", err)
return
}
Expand Down Expand Up @@ -312,9 +307,8 @@ func (f *chainFreezer) freezeRangeWithBlobs(nfdb *nofreezedb, number, limit uint

var (
cancunNumber uint64
found bool
preHashes []common.Hash
)

for i := number; i <= limit; i++ {
hash := ReadCanonicalHash(nfdb, i)
if hash == (common.Hash{}) {
Expand All @@ -326,16 +320,12 @@ func (f *chainFreezer) freezeRangeWithBlobs(nfdb *nofreezedb, number, limit uint
}
if isCancun(env, h.Number, h.Time) {
cancunNumber = i
found = true
break
}
}
if !found {
galaio marked this conversation as resolved.
Show resolved Hide resolved
return f.freezeRange(nfdb, number, limit)
}

// freeze pre cancun
preHashes, err := f.freezeRange(nfdb, number, cancunNumber-1)
preHashes, err = f.freezeRange(nfdb, number, cancunNumber-1)
if err != nil {
return preHashes, err
}
Expand Down Expand Up @@ -432,5 +422,5 @@ func isCancun(env *ethdb.FreezerEnv, num *big.Int, time uint64) bool {
}

func ResetEmptyBlobAncientTable(db ethdb.AncientWriter, next uint64) error {
return db.ResetTable(ChainFreezerBlobSidecarTable, next, next, true)
return db.ResetTable(ChainFreezerBlobSidecarTable, next, true)
}
14 changes: 10 additions & 4 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,6 @@ func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, e
return 0, errNotSupported
}

func (db *nofreezedb) ResetTable(kind string, tail uint64, head uint64, onlyEmpty bool) error {
return errNotSupported
}

// TruncateHead returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) TruncateHead(items uint64) (uint64, error) {
return 0, errNotSupported
Expand All @@ -191,6 +187,16 @@ func (db *nofreezedb) TruncateTail(items uint64) (uint64, error) {
return 0, errNotSupported
}

// TruncateTableTail will truncate certain table to new tail
func (db *nofreezedb) TruncateTableTail(kind string, tail uint64) (uint64, error) {
return 0, errNotSupported
}

// ResetTable will reset certain table with new start point
func (db *nofreezedb) ResetTable(kind string, startAt uint64, onlyEmpty bool) error {
return errNotSupported
}

// Sync returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) Sync() error {
return errNotSupported
Expand Down
53 changes: 41 additions & 12 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (f *Freezer) TruncateHead(items uint64) (uint64, error) {
if kind != ChainFreezerBlobSidecarTable {
return 0, err
}
nt, err := table.resetItems(items-f.offset, items-f.offset)
nt, err := table.resetItems(items - f.offset)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -489,7 +489,7 @@ func (f *Freezer) repair() error {
if kind != ChainFreezerBlobSidecarTable {
return err
}
nt, err := table.resetItems(head, head)
nt, err := table.resetItems(head)
if err != nil {
return err
}
Expand Down Expand Up @@ -698,34 +698,63 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
return nil
}

// TruncateTableTail will truncate certain table to new tail
func (f *Freezer) TruncateTableTail(kind string, tail uint64) (uint64, error) {
if f.readonly {
return 0, errReadOnly
}

f.writeLock.Lock()
defer f.writeLock.Unlock()

if !slices.Contains(additionTables, kind) {
return 0, errors.New("only new added table could be truncated independently")
}
if tail < f.offset {
return 0, errors.New("the input tail&head is less than offset")
}
t, exist := f.tables[kind]
if !exist {
return 0, errors.New("you reset a non-exist table")
}

old := t.itemHidden.Load() + f.offset
if err := t.truncateTail(tail - f.offset); err != nil {
return 0, err
}
return old, nil
}

// ResetTable will reset certain table with new start point
// only used for ChainFreezerBlobSidecarTable now
func (f *Freezer) ResetTable(kind string, tail, head uint64, onlyEmpty bool) error {
func (f *Freezer) ResetTable(kind string, startAt uint64, onlyEmpty bool) error {
if f.readonly {
return errReadOnly
}
if err := f.Sync(); err != nil {
return err
}

f.writeLock.Lock()
defer f.writeLock.Unlock()
if tail < f.offset || head < f.offset {
return errors.New("the input tail&head is less than offset")
}
if _, exist := f.tables[kind]; !exist {

t, exist := f.tables[kind]
if !exist {
return errors.New("you reset a non-exist table")
}

// if you reset a non empty table just skip
if onlyEmpty && !EmptyTable(f.tables[kind]) {
if onlyEmpty && !EmptyTable(t) {
return nil
}

nt, err := f.tables[kind].resetItems(tail-f.offset, head-f.offset)
if err := f.Sync(); err != nil {
return err
}
nt, err := t.resetItems(startAt - f.offset)
if err != nil {
return err
}
f.tables[kind] = nt

// repair all tables with same tail & head
if err := f.repair(); err != nil {
for _, table := range f.tables {
table.Close()
Expand Down
Loading
Loading