Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry pick pbss patches from go-ethereum #1962

Merged
merged 2 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie/triedb/pathdb"
)

// Config contains the configuration options of the ETH protocol.
Expand Down Expand Up @@ -129,7 +130,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", ethconfig.Defaults.Miner.GasPrice)
config.Miner.GasPrice = new(big.Int).Set(ethconfig.Defaults.Miner.GasPrice)
}
if config.NoPruning && config.TrieDirtyCache > 0 {
// Redistribute memory allocation from in-memory trie node garbage collection
// to other caches when an archive node is requested.
if config.StateScheme == rawdb.HashScheme && config.NoPruning && config.TrieDirtyCache > 0 {
if config.SnapshotCache > 0 {
config.TrieCleanCache += config.TrieDirtyCache * 3 / 5
config.SnapshotCache += config.TrieDirtyCache * 2 / 5
Expand All @@ -138,6 +141,13 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}
config.TrieDirtyCache = 0
}
// Optimize memory distribution by reallocating surplus allowance from the
// dirty cache to the clean cache.
if config.StateScheme == rawdb.PathScheme && config.TrieDirtyCache > pathdb.MaxDirtyBufferSize/1024/1024 {
log.Info("Capped dirty cache size", "provided", common.StorageSize(config.TrieDirtyCache)*1024*1024, "adjusted", common.StorageSize(pathdb.MaxDirtyBufferSize))
config.TrieCleanCache += config.TrieDirtyCache - pathdb.MaxDirtyBufferSize/1024/1024
config.TrieDirtyCache = pathdb.MaxDirtyBufferSize / 1024 / 1024
}
log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024)

// Assemble the Ethereum object
Expand Down
2 changes: 1 addition & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,6 @@ func (h *handler) voteBroadcastLoop() {
func (h *handler) enableSyncedFeatures() {
h.acceptTxs.Store(true)
if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
h.chain.TrieDB().SetBufferSize(pathdb.DefaultBufferSize)
h.chain.TrieDB().SetBufferSize(pathdb.DefaultDirtyBufferSize)
}
}
22 changes: 11 additions & 11 deletions trie/triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@ const (
// defaultCleanSize is the default memory allowance of clean cache.
defaultCleanSize = 16 * 1024 * 1024

// maxBufferSize is the maximum memory allowance of node buffer.
// MaxDirtyBufferSize is the maximum memory allowance of node buffer.
// Too large nodebuffer will cause the system to pause for a long
// time when write happens. Also, the largest batch that pebble can
// support is 4GB, node will panic if batch size exceeds this limit.
maxBufferSize = 256 * 1024 * 1024
MaxDirtyBufferSize = 256 * 1024 * 1024

// DefaultBufferSize is the default memory allowance of node buffer
// DefaultDirtyBufferSize is the default memory allowance of node buffer
// that aggregates the writes from above until it's flushed into the
// disk. It's meant to be used once the initial sync is finished.
// Do not increase the buffer size arbitrarily, otherwise the system
// pause time will increase when the database writes happen.
DefaultBufferSize = 64 * 1024 * 1024
DefaultDirtyBufferSize = 64 * 1024 * 1024
)

// layer is the interface implemented by all state layers which includes some
Expand Down Expand Up @@ -96,9 +96,9 @@ type Config struct {
// unreasonable or unworkable.
func (c *Config) sanitize() *Config {
conf := *c
if conf.DirtyCacheSize > maxBufferSize {
log.Warn("Sanitizing invalid node buffer size", "provided", common.StorageSize(conf.DirtyCacheSize), "updated", common.StorageSize(maxBufferSize))
conf.DirtyCacheSize = maxBufferSize
if conf.DirtyCacheSize > MaxDirtyBufferSize {
log.Warn("Sanitizing invalid node buffer size", "provided", common.StorageSize(conf.DirtyCacheSize), "updated", common.StorageSize(MaxDirtyBufferSize))
conf.DirtyCacheSize = MaxDirtyBufferSize
}
return &conf
}
Expand All @@ -107,7 +107,7 @@ func (c *Config) sanitize() *Config {
var Defaults = &Config{
StateHistory: params.FullImmutabilityThreshold,
CleanCacheSize: defaultCleanSize,
DirtyCacheSize: DefaultBufferSize,
DirtyCacheSize: DefaultDirtyBufferSize,
}

// ReadOnly is the config in order to open database in read only mode.
Expand Down Expand Up @@ -413,9 +413,9 @@ func (db *Database) SetBufferSize(size int) error {
db.lock.Lock()
defer db.lock.Unlock()

if size > maxBufferSize {
log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(maxBufferSize))
size = maxBufferSize
if size > MaxDirtyBufferSize {
log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(MaxDirtyBufferSize))
size = MaxDirtyBufferSize
}
db.bufferSize = size
return db.tree.bottom().setBufferSize(db.bufferSize)
Expand Down
51 changes: 42 additions & 9 deletions trie/triedb/pathdb/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,15 @@ type tester struct {
snapStorages map[common.Hash]map[common.Hash]map[common.Hash][]byte
}

func newTester(t *testing.T) *tester {
func newTester(t *testing.T, historyLimit uint64) *tester {
var (
disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false, false, false)
db = New(disk, &Config{CleanCacheSize: 256 * 1024, DirtyCacheSize: 256 * 1024})
obj = &tester{
db = New(disk, &Config{
StateHistory: historyLimit,
CleanCacheSize: 256 * 1024,
DirtyCacheSize: 256 * 1024,
})
obj = &tester{
db: db,
preimages: make(map[common.Hash]common.Address),
accounts: make(map[common.Hash][]byte),
Expand Down Expand Up @@ -376,7 +380,7 @@ func (t *tester) bottomIndex() int {

func TestDatabaseRollback(t *testing.T) {
// Verify state histories
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.verifyHistory(); err != nil {
Expand All @@ -402,7 +406,7 @@ func TestDatabaseRollback(t *testing.T) {

func TestDatabaseRecoverable(t *testing.T) {
var (
tester = newTester(t)
tester = newTester(t, 0)
index = tester.bottomIndex()
)
defer tester.release()
Expand Down Expand Up @@ -441,7 +445,7 @@ func TestDatabaseRecoverable(t *testing.T) {

func TestReset(t *testing.T) {
var (
tester = newTester(t)
tester = newTester(t, 0)
index = tester.bottomIndex()
)
defer tester.release()
Expand Down Expand Up @@ -475,7 +479,7 @@ func TestReset(t *testing.T) {
}

func TestCommit(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.db.Commit(tester.lastHash(), false); err != nil {
Expand All @@ -499,7 +503,7 @@ func TestCommit(t *testing.T) {
}

func TestJournal(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.db.Journal(tester.lastHash()); err != nil {
Expand All @@ -523,7 +527,7 @@ func TestJournal(t *testing.T) {
}

func TestCorruptedJournal(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.db.Journal(tester.lastHash()); err != nil {
Expand Down Expand Up @@ -552,6 +556,35 @@ func TestCorruptedJournal(t *testing.T) {
}
}

// TestTailTruncateHistory function is designed to test a specific edge case where,
// when history objects are removed from the end, it should trigger a state flush
// if the ID of the new tail object is even higher than the persisted state ID.
//
// For example, let's say the ID of the persistent state is 10, and the current
// history objects range from ID(5) to ID(15). As we accumulate six more objects,
// the history will expand to cover ID(11) to ID(21). ID(11) then becomes the
// oldest history object, and its ID is even higher than the stored state.
//
// In this scenario, it is mandatory to update the persistent state before
// truncating the tail histories. This ensures that the ID of the persistent state
// always falls within the range of [oldest-history-id, latest-history-id].
func TestTailTruncateHistory(t *testing.T) {
tester := newTester(t, 10)
defer tester.release()

tester.db.Close()
tester.db = New(tester.db.diskdb, &Config{StateHistory: 10})

head, err := tester.db.freezer.Ancients()
if err != nil {
t.Fatalf("Failed to obtain freezer head")
}
stored := rawdb.ReadPersistentStateID(tester.db.diskdb)
if head != stored {
t.Fatalf("Failed to truncate excess history object above, stored: %d, head: %d", stored, head)
}
}

// copyAccounts returns a deep-copied account set of the provided one.
func copyAccounts(set map[common.Hash][]byte) map[common.Hash][]byte {
copied := make(map[common.Hash][]byte, len(set))
Expand Down
2 changes: 1 addition & 1 deletion trie/triedb/pathdb/difflayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func emptyLayer() *diskLayer {
return &diskLayer{
db: New(rawdb.NewMemoryDatabase(), nil),
buffer: newNodeBuffer(DefaultBufferSize, nil, 0),
buffer: newNodeBuffer(DefaultDirtyBufferSize, nil, 0),
}
}

Expand Down
58 changes: 43 additions & 15 deletions trie/triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,37 +172,65 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
dl.lock.Lock()
defer dl.lock.Unlock()

// Construct and store the state history first. If crash happens
// after storing the state history but without flushing the
// corresponding states(journal), the stored state history will
// be truncated in the next restart.
// Construct and store the state history first. If crash happens after storing
// the state history but without flushing the corresponding states(journal),
// the stored state history will be truncated from head in the next restart.
var (
overflow bool
oldest uint64
)
if dl.db.freezer != nil {
err := writeHistory(dl.db.diskdb, dl.db.freezer, bottom, dl.db.config.StateHistory)
err := writeHistory(dl.db.freezer, bottom)
if err != nil {
return nil, err
}
// Determine if the persisted history object has exceeded the configured
// limitation, set the overflow as true if so.
tail, err := dl.db.freezer.Tail()
if err != nil {
return nil, err
}
limit := dl.db.config.StateHistory
if limit != 0 && bottom.stateID()-tail > limit {
overflow = true
oldest = bottom.stateID() - limit + 1 // track the id of history **after truncation**
}
}
// Mark the diskLayer as stale before applying any mutations on top.
dl.stale = true

// Store the root->id lookup afterwards. All stored lookups are
// identified by the **unique** state root. It's impossible that
// in the same chain blocks are not adjacent but have the same
// root.
// Store the root->id lookup afterwards. All stored lookups are identified
// by the **unique** state root. It's impossible that in the same chain
// blocks are not adjacent but have the same root.
if dl.id == 0 {
rawdb.WriteStateID(dl.db.diskdb, dl.root, 0)
}
rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID())

// Construct a new disk layer by merging the nodes from the provided
// diff layer, and flush the content in disk layer if there are too
// many nodes cached. The clean cache is inherited from the original
// disk layer for reusing.
// Construct a new disk layer by merging the nodes from the provided diff
// layer, and flush the content in disk layer if there are too many nodes
// cached. The clean cache is inherited from the original disk layer.
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.nodes))
err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force)
if err != nil {

// In a unique scenario where the ID of the oldest history object (after tail
// truncation) surpasses the persisted state ID, we take the necessary action
// of forcibly committing the cached dirty nodes to ensure that the persisted
// state ID remains higher.
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
force = true
}
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
return nil, err
}
// To remove outdated history objects from the end, we set the 'tail' parameter
// to 'oldest-1' due to the offset between the freezer index and the history ID.
if overflow {
pruned, err := truncateFromTail(ndl.db.diskdb, ndl.db.freezer, oldest-1)
if err != nil {
return nil, err
}
log.Debug("Pruned state history", "items", pruned, "tailid", oldest)
}
return ndl, nil
}

Expand Down
26 changes: 8 additions & 18 deletions trie/triedb/pathdb/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,38 +512,28 @@ func readHistory(freezer *rawdb.ResettableFreezer, id uint64) (*history, error)
return &dec, nil
}

// writeHistory writes the state history with provided state set. After
// storing the corresponding state history, it will also prune the stale
// histories from the disk with the given threshold.
func writeHistory(db ethdb.KeyValueStore, freezer *rawdb.ResettableFreezer, dl *diffLayer, limit uint64) error {
// writeHistory persists the state history with the provided state set.
func writeHistory(freezer *rawdb.ResettableFreezer, dl *diffLayer) error {
// Short circuit if state set is not available.
if dl.states == nil {
return errors.New("state change set is not available")
}
var (
err error
n int
start = time.Now()
h = newHistory(dl.rootHash(), dl.parentLayer().rootHash(), dl.block, dl.states)
start = time.Now()
history = newHistory(dl.rootHash(), dl.parentLayer().rootHash(), dl.block, dl.states)
)
accountData, storageData, accountIndex, storageIndex := h.encode()
accountData, storageData, accountIndex, storageIndex := history.encode()
dataSize := common.StorageSize(len(accountData) + len(storageData))
indexSize := common.StorageSize(len(accountIndex) + len(storageIndex))

// Write history data into five freezer table respectively.
rawdb.WriteStateHistory(freezer, dl.stateID(), h.meta.encode(), accountIndex, storageIndex, accountData, storageData)
rawdb.WriteStateHistory(freezer, dl.stateID(), history.meta.encode(), accountIndex, storageIndex, accountData, storageData)

// Prune stale state histories based on the config.
if limit != 0 && dl.stateID() > limit {
n, err = truncateFromTail(db, freezer, dl.stateID()-limit)
if err != nil {
return err
}
}
historyDataBytesMeter.Mark(int64(dataSize))
historyIndexBytesMeter.Mark(int64(indexSize))
historyBuildTimeMeter.UpdateSince(start)
log.Debug("Stored state history", "id", dl.stateID(), "block", dl.block, "data", dataSize, "index", indexSize, "pruned", n, "elapsed", common.PrettyDuration(time.Since(start)))
log.Debug("Stored state history", "id", dl.stateID(), "block", dl.block, "data", dataSize, "index", indexSize, "elapsed", common.PrettyDuration(time.Since(start)))

return nil
}

Expand Down