Skip to content

Commit

Permalink
Statediff for full node (#6)
Browse files Browse the repository at this point in the history
* Open a trie from the in-memory database

* Use a node's LeafKey as an identifier instead of the address

It was proving difficult to find look the address up from a given path
with a full node (sometimes the value wouldn't exist in the disk db).
So, instead, for now we are using the node's LeafKey with is a Keccak256
hash of the address, so if we know the address we can figure out which
LeafKey it matches up to.

* Make sure that statediff has been processed before pruning

* Use blockchain stateCache.OpenTrie for storage diffs

* Clean up log lines and remove unnecessary fields from builder

* Apply go fmt changes

* Add a sleep to the blockchain test

* Address PR comments

* Address PR comments
  • Loading branch information
elizabethengelman authored and i-norden committed Jul 17, 2019
1 parent a59f4c5 commit c1e8003
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 99 deletions.
8 changes: 5 additions & 3 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func makeFullNode(ctx *cli.Context) *node.Node {
stack, cfg := makeConfigNode(ctx)
utils.RegisterEthService(stack, &cfg.Eth)

if ctx.GlobalBool(utils.StateDiffFlag.Name) {
cfg.Eth.StateDiff = true
utils.RegisterStateDiffService(stack, ctx)
}

if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
}
Expand Down Expand Up @@ -179,9 +184,6 @@ func makeFullNode(ctx *cli.Context) *node.Node {
utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
}

if ctx.GlobalBool(utils.StateDiffFlag.Name) {
utils.RegisterStateDiffService(stack, ctx)
}
return stack
}

Expand Down
60 changes: 44 additions & 16 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type CacheConfig struct {
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
ProcessingStateDiffs bool // Whether statediffs processing should be taken into a account before a trie is pruned
}

// BlockChain represents the canonical chain given a database with a genesis
Expand Down Expand Up @@ -172,6 +173,8 @@ type BlockChain struct {
badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.

stateDiffsProcessed map[common.Hash]int
}

// NewBlockChain returns a fully initialised block chain using information
Expand All @@ -191,23 +194,24 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
blockCache, _ := lru.New(blockCacheLimit)
futureBlocks, _ := lru.New(maxFutureBlocks)
badBlocks, _ := lru.New(badBlockLimit)

stateDiffsProcessed := make(map[common.Hash]int)
bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
quit: make(chan struct{}),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
quit: make(chan struct{}),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
stateDiffsProcessed: stateDiffsProcessed,
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
Expand Down Expand Up @@ -1243,6 +1247,11 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
return nil
}

func (bc *BlockChain) AddToStateDiffProcessedCollection(hash common.Hash) {
count := bc.stateDiffsProcessed[hash]
bc.stateDiffsProcessed[hash] = count + 1
}

// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
bc.chainmu.Lock()
Expand Down Expand Up @@ -1327,6 +1336,16 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
bc.triegc.Push(root, number)
break
}

if bc.cacheConfig.ProcessingStateDiffs {
if !bc.allowedRootToBeDereferenced(root.(common.Hash)) {
bc.triegc.Push(root, number)
break
} else {
delete(bc.stateDiffsProcessed, root.(common.Hash))
}
}

triedb.Dereference(root.(common.Hash))
}
}
Expand Down Expand Up @@ -1381,6 +1400,15 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
return status, nil
}

// since we need the state tries of the current block and its parent in-memory
// in order to process statediffs, we should avoid dereferencing roots until
// its statediff and its child have been processed
func (bc *BlockChain) allowedRootToBeDereferenced(root common.Hash) bool {
diffProcessedForSelfAndChildCount := 2
count := bc.stateDiffsProcessed[root]
return count >= diffProcessedForSelfAndChildCount
}

// addFutureBlock checks if the block is within the max allowed window to get
// accepted for future processing, and returns an error if the block is too far
// ahead and was not added.
Expand Down
85 changes: 83 additions & 2 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2227,8 +2227,8 @@ func BenchmarkBlockChain_1x1000ValueTransferToExisting(b *testing.B) {

func BenchmarkBlockChain_1x1000Executions(b *testing.B) {
var (
numTxs = 1000
numBlocks = 1
numTxs= 1000
numBlocks= 1
)
b.StopTimer()
b.ResetTimer()
Expand All @@ -2241,3 +2241,84 @@ func BenchmarkBlockChain_1x1000Executions(b *testing.B) {
}
benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
}

func TestProcessingStateDiffs(t *testing.T) {
defaultTrieCleanCache := 256
defaultTrieDirtyCache := 256
defaultTrieTimeout := 60 * time.Minute
cacheConfig := &CacheConfig{
TrieDirtyDisabled: false,
TrieCleanLimit: defaultTrieCleanCache,
TrieDirtyLimit: defaultTrieDirtyCache,
TrieTimeLimit: defaultTrieTimeout,
ProcessingStateDiffs: true,
}
db := rawdb.NewMemoryDatabase()
genesis := new(Genesis).MustCommit(db)
numberOfBlocks := TriesInMemory
engine := ethash.NewFaker()
blockchain, _ := NewBlockChain(db, cacheConfig, params.AllEthashProtocolChanges, engine, vm.Config{}, nil)
blocks := makeBlockChain(genesis, numberOfBlocks+1, engine, db, canonicalSeed)
_, err := blockchain.InsertChain(blocks)
if err != nil {
t.Fatalf("failed to create pristine chain: %v", err)
}
defer blockchain.Stop()

//when adding a root hash to the collection, it will increment the count
firstStateRoot := blocks[0].Root()
blockchain.AddToStateDiffProcessedCollection(firstStateRoot)
value, ok := blockchain.stateDiffsProcessed[firstStateRoot]
if !ok {
t.Error("state root not found in collection")
}
if value != 1 {
t.Error("state root count not correct", "want", 1, "got", value)
}

blockchain.AddToStateDiffProcessedCollection(firstStateRoot)
value, ok = blockchain.stateDiffsProcessed[firstStateRoot]
if !ok {
t.Error("state root not found in collection")
}
if value != 2 {
t.Error("state root count not correct", "want", 2, "got", value)
}

moreBlocks := makeBlockChain(blocks[len(blocks)-1], 1, engine, db, canonicalSeed)
_, err = blockchain.InsertChain(moreBlocks)

//a root hash can be dereferenced when it's state diff and it's child's state diff have been processed
//(i.e. it has a count of 2 in stateDiffsProcessed)
nodes := blockchain.stateCache.TrieDB().Nodes()
if containsRootHash(nodes, firstStateRoot) {
t.Errorf("stateRoot %s in nodes, want: %t, got: %t", firstStateRoot.Hex(), false, true)
}

//a root hash should still be in the in-mem db if it's child's state diff hasn't yet been processed
//(i.e. it has a count of 1 stateDiffsProcessed)
secondStateRoot := blocks[1].Root()
blockchain.AddToStateDiffProcessedCollection(secondStateRoot)
if !containsRootHash(nodes, secondStateRoot) {
t.Errorf("stateRoot %s in nodes, want: %t, got: %t", secondStateRoot.Hex(), true, false)
}

//the stateDiffsProcessed collection is cleaned up once a hash has been dereferenced
_, ok = blockchain.stateDiffsProcessed[firstStateRoot]
if ok {
t.Errorf("stateRoot %s in stateDiffsProcessed collection, want: %t, got: %t",
firstStateRoot.Hex(),
false,
ok,
)
}
}

func containsRootHash(collection []common.Hash, hash common.Hash) bool {
for _, n := range collection {
if n == hash {
return true
}
}
return false
}
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
ProcessingStateDiffs: config.StateDiff,
}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve)
Expand Down
4 changes: 4 additions & 0 deletions eth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ var DefaultConfig = Config{
Blocks: 20,
Percentile: 60,
},

StateDiff: false,
}

func init() {
Expand Down Expand Up @@ -149,6 +151,8 @@ type Config struct {
// RPCGasCap is the global gas cap for eth-call variants.
RPCGasCap *big.Int `toml:",omitempty"`

StateDiff bool

// Checkpoint is a hardcoded checkpoint which can be nil.
Checkpoint *params.TrustedCheckpoint `toml:",omitempty"`

Expand Down
Loading

0 comments on commit c1e8003

Please sign in to comment.