Skip to content

Commit

Permalink
trie/triedb/pathdb: improve dirty node flushing trigger (#28426)
Browse files Browse the repository at this point in the history
* trie/triedb/pathdb: improve dirty node flushing trigger

* trie/triedb/pathdb: add tests

* trie/triedb/pathdb: address comment
  • Loading branch information
rjl493456442 authored and Francesco4203 committed Oct 29, 2024
1 parent 63a4de2 commit 4bf2454
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 38 deletions.
51 changes: 42 additions & 9 deletions trie/triedb/pathdb/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,15 @@ type tester struct {
snapStorages map[common.Hash]map[common.Hash]map[common.Hash][]byte
}

func newTester(t *testing.T) *tester {
func newTester(t *testing.T, historyLimit uint64) *tester {
var (
disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false)
db = New(disk, &Config{CleanCacheSize: 256 * 1024, DirtyCacheSize: 256 * 1024})
obj = &tester{
db = New(disk, &Config{
StateHistory: historyLimit,
CleanCacheSize: 256 * 1024,
DirtyCacheSize: 256 * 1024,
})
obj = &tester{
db: db,
preimages: make(map[common.Hash]common.Address),
accounts: make(map[common.Hash][]byte),
Expand Down Expand Up @@ -377,7 +381,7 @@ func (t *tester) bottomIndex() int {

func TestDatabaseRollback(t *testing.T) {
// Verify state histories
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.verifyHistory(); err != nil {
Expand All @@ -403,7 +407,7 @@ func TestDatabaseRollback(t *testing.T) {

func TestDatabaseRecoverable(t *testing.T) {
var (
tester = newTester(t)
tester = newTester(t, 0)
index = tester.bottomIndex()
)
defer tester.release()
Expand Down Expand Up @@ -441,7 +445,7 @@ func TestDatabaseRecoverable(t *testing.T) {
}

func TestDisable(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

_, stored := rawdb.ReadAccountTrieNode(tester.db.diskdb, nil)
Expand Down Expand Up @@ -477,7 +481,7 @@ func TestDisable(t *testing.T) {
}

func TestCommit(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.db.Commit(tester.lastHash(), false); err != nil {
Expand All @@ -501,7 +505,7 @@ func TestCommit(t *testing.T) {
}

func TestJournal(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.db.Journal(tester.lastHash()); err != nil {
Expand All @@ -525,7 +529,7 @@ func TestJournal(t *testing.T) {
}

func TestCorruptedJournal(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.db.Journal(tester.lastHash()); err != nil {
Expand Down Expand Up @@ -554,6 +558,35 @@ func TestCorruptedJournal(t *testing.T) {
}
}

// TestTailTruncateHistory function is designed to test a specific edge case where,
// when history objects are removed from the end, it should trigger a state flush
// if the ID of the new tail object is even higher than the persisted state ID.
//
// For example, let's say the ID of the persistent state is 10, and the current
// history objects range from ID(5) to ID(15). As we accumulate six more objects,
// the history will expand to cover ID(11) to ID(21). ID(11) then becomes the
// oldest history object, and its ID is even higher than the stored state.
//
// In this scenario, it is mandatory to update the persistent state before
// truncating the tail histories. This ensures that the ID of the persistent state
// always falls within the range of [oldest-history-id, latest-history-id].
func TestTailTruncateHistory(t *testing.T) {
tester := newTester(t, 10)
defer tester.release()

tester.db.Close()
tester.db = New(tester.db.diskdb, &Config{StateHistory: 10})

head, err := tester.db.freezer.Ancients()
if err != nil {
t.Fatalf("Failed to obtain freezer head")
}
stored := rawdb.ReadPersistentStateID(tester.db.diskdb)
if head != stored {
t.Fatalf("Failed to truncate excess history object above, stored: %d, head: %d", stored, head)
}
}

// copyAccounts returns a deep-copied account set of the provided one.
func copyAccounts(set map[common.Hash][]byte) map[common.Hash][]byte {
copied := make(map[common.Hash][]byte, len(set))
Expand Down
52 changes: 41 additions & 11 deletions trie/triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,30 @@ func (dl *diskLayer) update(root common.Hash, id uint64, block uint64, nodes map
func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
dl.lock.Lock()
defer dl.lock.Unlock()
// Construct and store the state history first. If crash happens
// after storing the state history but without flushing the
// corresponding states(journal), the stored state history will
// be truncated in the next restart.

// Construct and store the state history first. If crash happens after storing
// the state history but without flushing the corresponding states(journal),
// the stored state history will be truncated from head in the next restart.
var (
overflow bool
oldest uint64
)
if dl.db.freezer != nil {
err := writeHistory(dl.db.diskdb, dl.db.freezer, bottom, dl.db.config.StateHistory)
err := writeHistory(dl.db.freezer, bottom)
if err != nil {
return nil, err
}
// Determine if the persisted history object has exceeded the configured
// limitation, set the overflow as true if so.
tail, err := dl.db.freezer.Tail()
if err != nil {
return nil, err
}
limit := dl.db.config.StateHistory
if limit != 0 && bottom.stateID()-tail > limit {
overflow = true
oldest = bottom.stateID() - limit + 1 // track the id of history **after truncation**
}
}

// Mark the diskLayer as stale before applying any mutations on top.
Expand All @@ -194,15 +209,30 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
}
rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID())

// Construct a new disk layer by merging the nodes from the provided
// diff layer, and flush the content in disk layer if there are too
// many nodes cached. The clean cache is inherited from the original
// disk layer for reusing.
// Construct a new disk layer by merging the nodes from the provided diff
// layer, and flush the content in disk layer if there are too many nodes
// cached. The clean cache is inherited from the original disk layer.
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.nodes))
err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force)
if err != nil {

// In a unique scenario where the ID of the oldest history object (after tail
// truncation) surpasses the persisted state ID, we take the necessary action
// of forcibly committing the cached dirty nodes to ensure that the persisted
// state ID remains higher.
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
force = true
}
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
return nil, err
}
// To remove outdated history objects from the end, we set the 'tail' parameter
// to 'oldest-1' due to the offset between the freezer index and the history ID.
if overflow {
pruned, err := truncateFromTail(ndl.db.diskdb, ndl.db.freezer, oldest-1)
if err != nil {
return nil, err
}
log.Debug("Pruned state history", "items", pruned, "tailid", oldest)
}
return ndl, nil
}

Expand Down
26 changes: 8 additions & 18 deletions trie/triedb/pathdb/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,39 +530,29 @@ func readHistory(freezer *rawdb.ResettableFreezer, id uint64) (*history, error)
return &dec, nil
}

// writeHistory writes the state history with provided state set. After
// storing the corresponding state history, it will also prune the stale
// histories from the disk with the given threshold.
func writeHistory(db ethdb.KeyValueStore, freezer *rawdb.ResettableFreezer, dl *diffLayer, limit uint64) error {
// writeHistory persists the state history with the provided state set.
func writeHistory(freezer *rawdb.ResettableFreezer, dl *diffLayer) error {
// Short circuit if state set is not available.
if dl.states == nil {
return errors.New("state change set is not available")
}
var (
err error
n int
start = time.Now()
h = newHistory(dl.rootHash(), dl.parentLayer().rootHash(), dl.block, dl.states)
start = time.Now()
history = newHistory(dl.rootHash(), dl.parentLayer().rootHash(), dl.block, dl.states)
)
// Return byte streams of account and storage infor in current state.
accountData, storageData, accountIndex, storageIndex := h.encode()
accountData, storageData, accountIndex, storageIndex := history.encode()
dataSize := common.StorageSize(len(accountData) + len(storageData))
indexSize := common.StorageSize(len(accountIndex) + len(storageIndex))

// Write history data into five freezer table respectively.
rawdb.WriteStateHistory(freezer, dl.stateID(), h.meta.encode(), accountIndex, storageIndex, accountData, storageData)
rawdb.WriteStateHistory(freezer, dl.stateID(), history.meta.encode(), accountIndex, storageIndex, accountData, storageData)

// Prune stale state histories based on the config.
if limit != 0 && dl.stateID() > limit {
n, err = truncateFromTail(db, freezer, dl.stateID()-limit)
if err != nil {
return err
}
}
historyDataBytesMeter.Mark(int64(dataSize))
historyIndexBytesMeter.Mark(int64(indexSize))
historyBuildTimeMeter.UpdateSince(start)
log.Debug("Stored state history", "id", dl.stateID(), "block", dl.block, "data", dataSize, "index", indexSize, "pruned", n, "elapsed", common.PrettyDuration(time.Since(start)))
log.Debug("Stored state history", "id", dl.stateID(), "block", dl.block, "data", dataSize, "index", indexSize, "elapsed", common.PrettyDuration(time.Since(start)))

return nil
}

Expand Down

0 comments on commit 4bf2454

Please sign in to comment.