Skip to content

Commit

Permalink
trie/triedb/pathdb, core/rawdb: pbss fix release v1.13.5 (corner-case…
Browse files Browse the repository at this point in the history
…s in path scheme state management) (#619)

* trie/triedb/pathdb, core/rawdb: enhance error message in freezer (#28198)

This PR adds more error message for debugging purpose.

* trie/triedb/pathdb: improve dirty node flushing trigger (#28426)

* trie/triedb/pathdb: improve dirty node flushing trigger

* trie/triedb/pathdb: add tests

* trie/triedb/pathdb: address comment

* core/rawdb: fsync the index file after each freezer write (#28483)

* core/rawdb: fsync the index and data file after each freezer write

* core/rawdb: fsync the data file in freezer after write

---------

Co-authored-by: rjl493456442 <[email protected]>
  • Loading branch information
Francesco4203 and rjl493456442 authored Oct 29, 2024
1 parent 661778c commit 8b18ecb
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 53 deletions.
3 changes: 3 additions & 0 deletions core/rawdb/ancient_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rawdb

import (
"fmt"
"path/filepath"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
Expand Down Expand Up @@ -124,6 +125,8 @@ func InspectFreezerTable(ancient string, freezerName string, tableName string, s
switch freezerName {
case chainFreezerName:
path, tables = resolveChainFreezerDir(ancient), chainFreezerNoSnappy
case stateFreezerName:
path, tables = filepath.Join(ancient, freezerName), stateFreezerNoSnappy
default:
return fmt.Errorf("unknown freezer, supported ones: %v", freezers)
}
Expand Down
12 changes: 10 additions & 2 deletions core/rawdb/freezer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,27 @@ func (batch *freezerTableBatch) maybeCommit() error {

// commit writes the batched items to the backing freezerTable.
func (batch *freezerTableBatch) commit() error {
// Write data.
// Write data. The head file is fsync'd after write to ensure the
// data is truly transferred to disk.
_, err := batch.t.head.Write(batch.dataBuffer)
if err != nil {
return err
}
if err := batch.t.head.Sync(); err != nil {
return err
}
dataSize := int64(len(batch.dataBuffer))
batch.dataBuffer = batch.dataBuffer[:0]

// Write index.
// Write indices. The index file is fsync'd after write to ensure the
// data indexes are truly transferred to disk.
_, err = batch.t.index.Write(batch.indexBuffer)
if err != nil {
return err
}
if err := batch.t.index.Sync(); err != nil {
return err
}
indexSize := int64(len(batch.indexBuffer))
batch.indexBuffer = batch.indexBuffer[:0]

Expand Down
3 changes: 1 addition & 2 deletions core/rawdb/freezer_resettable.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ func cleanup(pathToDelete string) error {

for _, name := range names {
if name == filepath.Base(pathToDelete)+tmpSuffix {
// Figure out then delete the tmp directory which is renamed in Reset Method.
log.Info("Cleaning up the freezer Reset directory", "pathToDelete", pathToDelete, "total files inside", len(names))
log.Info("Removed leftover freezer directory", "name", name)
return os.RemoveAll(filepath.Join(parentDir, name))
}
}
Expand Down
28 changes: 24 additions & 4 deletions core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ func (t *freezerTable) repair() error {
}
// Ensure the index is a multiple of indexEntrySize bytes
if overflow := stat.Size() % indexEntrySize; overflow != 0 {
truncateFreezerFile(t.index, stat.Size()-overflow) // New file can't trigger this path
if err := truncateFreezerFile(t.index, stat.Size()-overflow); err != nil {
return err
} // New file can't trigger this path
}
// Retrieve the file sizes and prepare for truncation
if stat, err = t.index.Stat(); err != nil {
Expand Down Expand Up @@ -238,6 +240,12 @@ func (t *freezerTable) repair() error {
t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
lastIndex.unmarshalBinary(buffer)
}
// Print an error log if the index is corrupted due to an incorrect
// last index item. While it is theoretically possible to have a zero offset
// by storing all zero-size items, it is highly unlikely to occur in practice.
if lastIndex.offset == 0 && offsetsSize/indexEntrySize > 1 {
log.Error("Corrupted index file detected", "lastOffset", lastIndex.offset, "indexes", offsetsSize/indexEntrySize)
}
t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend)
if err != nil {
return err
Expand Down Expand Up @@ -372,6 +380,9 @@ func (t *freezerTable) truncateHead(items uint64) error {
if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil {
return err
}
if err := t.index.Sync(); err != nil {
return err
}
var expected indexEntry
if length == 0 {
expected = indexEntry{filenum: t.tailId, offset: 0}
Expand All @@ -394,13 +405,17 @@ func (t *freezerTable) truncateHead(items uint64) error {
// Release any files _after the current head -- both the previous head
// and any files which may have been opened for reading
t.releaseFilesAfter(expected.filenum, true)

// Set back the historic head
t.head = newHead
t.headId = expected.filenum
}
if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil {
return err
}
if err := t.head.Sync(); err != nil {
return err
}
// All data files truncated, set internal counters and return
t.headBytes = int64(expected.offset)
t.items.Store(items)
Expand Down Expand Up @@ -491,6 +506,10 @@ func (t *freezerTable) truncateTail(items uint64) error {
if err := t.meta.Sync(); err != nil {
return err
}
// Close the index file before shorten it.
if err := t.index.Close(); err != nil {
return err
}
// Truncate the deleted index entries from the index file. It overwrites the entries in current index file.
err = copyFrom(t.index.Name(), t.index.Name(), indexEntrySize*(newDeleted-deleted+1), func(f *os.File) error {
tailIndex := indexEntry{
Expand All @@ -504,13 +523,14 @@ func (t *freezerTable) truncateTail(items uint64) error {
return err
}
// Reopen the modified index file to load the changes
if err := t.index.Close(); err != nil {
return err
}
t.index, err = openFreezerFileForAppend(t.index.Name())
if err != nil {
return err
}
// Sync the file to ensure changes are flushed to disk
if err := t.index.Sync(); err != nil {
return err
}
// Release/Delete any files before the current tail
t.tailId = newTailId
t.itemOffset.Store(newDeleted)
Expand Down
6 changes: 1 addition & 5 deletions core/rawdb/freezer_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ func copyFrom(srcPath, destPath string, offset uint64, beforeCopyFunc func(f *os
return err
}
f = nil

if err := os.Rename(fname, destPath); err != nil {
return err
}
return nil
return os.Rename(fname, destPath)
}

// openFreezerFileForAppend opens a freezer table file and seeks to the end, if it's not exist, create it.
Expand Down
51 changes: 42 additions & 9 deletions trie/triedb/pathdb/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,15 @@ type tester struct {
snapStorages map[common.Hash]map[common.Hash]map[common.Hash][]byte
}

func newTester(t *testing.T) *tester {
func newTester(t *testing.T, historyLimit uint64) *tester {
var (
disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false)
db = New(disk, &Config{CleanCacheSize: 256 * 1024, DirtyCacheSize: 256 * 1024})
obj = &tester{
db = New(disk, &Config{
StateHistory: historyLimit,
CleanCacheSize: 256 * 1024,
DirtyCacheSize: 256 * 1024,
})
obj = &tester{
db: db,
preimages: make(map[common.Hash]common.Address),
accounts: make(map[common.Hash][]byte),
Expand Down Expand Up @@ -377,7 +381,7 @@ func (t *tester) bottomIndex() int {

func TestDatabaseRollback(t *testing.T) {
// Verify state histories
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.verifyHistory(); err != nil {
Expand All @@ -403,7 +407,7 @@ func TestDatabaseRollback(t *testing.T) {

func TestDatabaseRecoverable(t *testing.T) {
var (
tester = newTester(t)
tester = newTester(t, 0)
index = tester.bottomIndex()
)
defer tester.release()
Expand Down Expand Up @@ -441,7 +445,7 @@ func TestDatabaseRecoverable(t *testing.T) {
}

func TestDisable(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

_, stored := rawdb.ReadAccountTrieNode(tester.db.diskdb, nil)
Expand Down Expand Up @@ -477,7 +481,7 @@ func TestDisable(t *testing.T) {
}

func TestCommit(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.db.Commit(tester.lastHash(), false); err != nil {
Expand All @@ -501,7 +505,7 @@ func TestCommit(t *testing.T) {
}

func TestJournal(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.db.Journal(tester.lastHash()); err != nil {
Expand All @@ -525,7 +529,7 @@ func TestJournal(t *testing.T) {
}

func TestCorruptedJournal(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.db.Journal(tester.lastHash()); err != nil {
Expand Down Expand Up @@ -554,6 +558,35 @@ func TestCorruptedJournal(t *testing.T) {
}
}

// TestTailTruncateHistory function is designed to test a specific edge case where,
// when history objects are removed from the end, it should trigger a state flush
// if the ID of the new tail object is even higher than the persisted state ID.
//
// For example, let's say the ID of the persistent state is 10, and the current
// history objects range from ID(5) to ID(15). As we accumulate six more objects,
// the history will expand to cover ID(11) to ID(21). ID(11) then becomes the
// oldest history object, and its ID is even higher than the stored state.
//
// In this scenario, it is mandatory to update the persistent state before
// truncating the tail histories. This ensures that the ID of the persistent state
// always falls within the range of [oldest-history-id, latest-history-id].
func TestTailTruncateHistory(t *testing.T) {
tester := newTester(t, 10)
defer tester.release()

tester.db.Close()
tester.db = New(tester.db.diskdb, &Config{StateHistory: 10})

head, err := tester.db.freezer.Ancients()
if err != nil {
t.Fatalf("Failed to obtain freezer head")
}
stored := rawdb.ReadPersistentStateID(tester.db.diskdb)
if head != stored {
t.Fatalf("Failed to truncate excess history object above, stored: %d, head: %d", stored, head)
}
}

// copyAccounts returns a deep-copied account set of the provided one.
func copyAccounts(set map[common.Hash][]byte) map[common.Hash][]byte {
copied := make(map[common.Hash][]byte, len(set))
Expand Down
52 changes: 41 additions & 11 deletions trie/triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,30 @@ func (dl *diskLayer) update(root common.Hash, id uint64, block uint64, nodes map
func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
dl.lock.Lock()
defer dl.lock.Unlock()
// Construct and store the state history first. If crash happens
// after storing the state history but without flushing the
// corresponding states(journal), the stored state history will
// be truncated in the next restart.

// Construct and store the state history first. If crash happens after storing
// the state history but without flushing the corresponding states(journal),
// the stored state history will be truncated from head in the next restart.
var (
overflow bool
oldest uint64
)
if dl.db.freezer != nil {
err := writeHistory(dl.db.diskdb, dl.db.freezer, bottom, dl.db.config.StateHistory)
err := writeHistory(dl.db.freezer, bottom)
if err != nil {
return nil, err
}
// Determine if the persisted history object has exceeded the configured
// limitation, set the overflow as true if so.
tail, err := dl.db.freezer.Tail()
if err != nil {
return nil, err
}
limit := dl.db.config.StateHistory
if limit != 0 && bottom.stateID()-tail > limit {
overflow = true
oldest = bottom.stateID() - limit + 1 // track the id of history **after truncation**
}
}

// Mark the diskLayer as stale before applying any mutations on top.
Expand All @@ -194,15 +209,30 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
}
rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID())

// Construct a new disk layer by merging the nodes from the provided
// diff layer, and flush the content in disk layer if there are too
// many nodes cached. The clean cache is inherited from the original
// disk layer for reusing.
// Construct a new disk layer by merging the nodes from the provided diff
// layer, and flush the content in disk layer if there are too many nodes
// cached. The clean cache is inherited from the original disk layer.
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.nodes))
err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force)
if err != nil {

// In a unique scenario where the ID of the oldest history object (after tail
// truncation) surpasses the persisted state ID, we take the necessary action
// of forcibly committing the cached dirty nodes to ensure that the persisted
// state ID remains higher.
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
force = true
}
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
return nil, err
}
// To remove outdated history objects from the end, we set the 'tail' parameter
// to 'oldest-1' due to the offset between the freezer index and the history ID.
if overflow {
pruned, err := truncateFromTail(ndl.db.diskdb, ndl.db.freezer, oldest-1)
if err != nil {
return nil, err
}
log.Debug("Pruned state history", "items", pruned, "tailid", oldest)
}
return ndl, nil
}

Expand Down
Loading

0 comments on commit 8b18ecb

Please sign in to comment.