Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trie: parallel insert trie when root node is a full node #597

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package core

import (
"encoding/binary"
"errors"
"fmt"
"io/ioutil"
"math/big"
"math/rand"
"os"
"os/exec"
"sync"
"testing"
"time"
Expand All @@ -40,6 +42,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -4312,3 +4315,170 @@ func TestSidecarsPruning(t *testing.T) {
}
}
}

func TestBlockChain_2000StorageUpdate(t *testing.T) {
var (
numTxs = 2000
signer = types.HomesteadSigner{}
testBankKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
bankFunds = big.NewInt(100000000000000000)
contractAddress = common.HexToAddress("0x1234")
gspec = Genesis{
Config: params.TestChainConfig,
Alloc: GenesisAlloc{
testBankAddress: {Balance: bankFunds},
contractAddress: {
Nonce: 1,
Balance: common.Big0,
// Store 1 into slot passed by calldata
Code: []byte{
byte(vm.PUSH0),
byte(vm.CALLDATALOAD),
byte(vm.PUSH1),
byte(0x1),
byte(vm.SWAP1),
byte(vm.SSTORE),
byte(vm.STOP),
},
Storage: make(map[common.Hash]common.Hash),
},
},
GasLimit: 100e6, // 100 M
}
)

for i := 0; i < 1000; i++ {
gspec.Alloc[contractAddress].Storage[common.BigToHash(big.NewInt(int64(i)))] = common.BigToHash(big.NewInt(0x100))
}

// Generate the original common chain segment and the two competing forks
engine := ethash.NewFaker()
db := rawdb.NewMemoryDatabase()
genesis := gspec.MustCommit(db)

blockGenerator := func(i int, block *BlockGen) {
block.SetCoinbase(common.Address{1})
for txi := 0; txi < numTxs; txi++ {
var calldata [32]byte
binary.BigEndian.PutUint64(calldata[:], uint64(txi))
tx, err := types.SignTx(
types.NewTransaction(uint64(txi), contractAddress, common.Big0, 100_000,
block.header.BaseFee, calldata[:]),
signer,
testBankKey)
if err != nil {
t.Error(err)
}
block.AddTx(tx)
}
}

shared, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 1, blockGenerator, true)
err := os.Mkdir("./pebble", 0775)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll("./pebble")
// Import the shared chain and the original canonical one
diskdb, err := rawdb.NewPebbleDBDatabase("./pebble", 1024, 500000, "", false, false)
if err != nil {
t.Fatal(err)
}
defer diskdb.Close()
gspec.MustCommit(diskdb)

chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("failed to create tester chain: %v", err)
}
if _, err := chain.InsertChain(shared, nil); err != nil {
t.Fatalf("failed to insert shared chain: %v", err)
}

blockHash := chain.CurrentBlock().Hash()
if blockHash != (common.HexToHash("0x684f656efba5a77f0e8b4c768a2b3479b28250fd7b81dbb9a888abf6180b01bd")) {
t.Fatalf("Block hash mismatches, exp %s got %s", common.Hash{}, blockHash)
}
}

// This benchmark is intended to be used with mainnet data, so mainnet chaindata's directory
// is needed to run this benchmark
func BenchmarkManyStorageUpdate(b *testing.B) {
const (
// Fill the chaindata's parent directory
datadir = ""
numInsert = state.ParallelInsertThreshold + 1
)

var (
diskdb ethdb.Database
err error
axieContract = common.HexToAddress("0x32950db2a7164ae833121501c797d79e7b79d74c")
value = common.HexToHash("0x11")
)
defer func() {
if diskdb != nil {
diskdb.Close()
cmd := exec.Command("../script/overlayfs_chaindata.sh", "-d", datadir, "-c")
if err := cmd.Run(); err != nil {
b.Fatal(err)
}
}
}()

keys := make([]common.Hash, 0, numInsert)
for i := 0; i < numInsert; i++ {
hash := crypto.Keccak256Hash(big.NewInt(int64(i)).Bytes())
keys = append(keys, hash)
}

b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
cmd := exec.Command("../script/overlayfs_chaindata.sh", "-d", datadir)
if err := cmd.Run(); err != nil {
b.Fatal(err)
}

diskdb, err = rawdb.NewPebbleDBDatabase(datadir+"/chaindata", 1024, 500000, "", false, false)
if err != nil {
b.Fatal(err)
}

engine := ethash.NewFaker()
chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil)
if err != nil {
b.Fatalf("failed to create tester chain: %v", err)
}
headBlock := chain.CurrentBlock()

database := state.NewDatabase(diskdb)
snapshot, err := snapshot.New(diskdb, database.TrieDB(), 256, headBlock.Root(), true, true, false)
if err != nil {
b.Fatal(err)
}

statedb, err := state.New(headBlock.Root(), database, snapshot)
if err != nil {
b.Fatal(err)
}

b.StartTimer()
for i := 0; i < numInsert; i++ {
statedb.SetState(axieContract, keys[i], value)
}
_, err = statedb.Commit(true)
if err != nil {
b.Fatal(err)
}
b.StopTimer()

diskdb.Close()
cmd = exec.Command("../script/overlayfs_chaindata.sh", "-d", datadir, "-c")
if err := cmd.Run(); err != nil {
b.Fatal(err)
}
diskdb = nil
}
}
24 changes: 23 additions & 1 deletion core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)

var emptyCodeHash = crypto.Keccak256(nil)

const ParallelInsertThreshold = 500

type Code []byte

func (c Code) String() string {
Expand Down Expand Up @@ -338,6 +341,16 @@ func (s *stateObject) updateTrie(db Database) Trie {
tr := s.getTrie(db)
hasher := s.db.hasher

var (
parallelInsert, ok bool
secureTrie *trie.SecureTrie
keys, values [][]byte
)
if len(s.pendingStorage) > ParallelInsertThreshold {
if secureTrie, ok = tr.(*trie.SecureTrie); ok {
parallelInsert = true
}
}
usedStorage := make([][]byte, 0, len(s.pendingStorage))
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
Expand All @@ -353,8 +366,14 @@ func (s *stateObject) updateTrie(db Database) Trie {
} else {
// Encoding []byte cannot fail, ok to ignore the error.
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
s.setError(tr.TryUpdate(key[:], v))
s.db.StorageUpdated += 1
if parallelInsert {
key := key
keys = append(keys, key[:])
values = append(values, v)
} else {
s.setError(tr.TryUpdate(key[:], v))
}
}
// If state snapshotting is active, cache the data til commit
if s.db.snap != nil {
Expand All @@ -369,6 +388,9 @@ func (s *stateObject) updateTrie(db Database) Trie {
}
usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure
}
if parallelInsert && len(keys) > 0 {
s.setError(secureTrie.TryBatchInsert(keys, values))
}
if s.db.prefetcher != nil {
s.db.prefetcher.used(s.data.Root, usedStorage)
}
Expand Down
22 changes: 22 additions & 0 deletions trie/secure_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,28 @@ func (t *SecureTrie) TryUpdate(key, value []byte) error {
return nil
}

// TryBatchInsert batches multiple insert together.
func (t *SecureTrie) TryBatchInsert(keys, values [][]byte) error {
hashKeys := make([][]byte, 0, len(keys))
for i := range keys {
hk := t.hashKey(keys[i])
// t.hashKey does not return a new slice but an shared internal slice,
// so we must copy here
hashKeys = append(hashKeys, common.CopyBytes(hk))
}

err := t.trie.TryBatchInsert(hashKeys, values)
if err != nil {
return err
}

for i, hashKey := range hashKeys {
t.getSecKeyCache()[string(hashKey)] = common.CopyBytes(keys[i])
}

return nil
}

// Delete removes any existing value for key from the trie.
func (t *SecureTrie) Delete(key []byte) {
if err := t.TryDelete(key); err != nil {
Expand Down
82 changes: 82 additions & 0 deletions trie/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,88 @@ func (t *Trie) TryUpdate(key, value []byte) error {
return nil
}

// TryBatchInsert batches multiple insert together.
//
// When the root node after resolving is a fullnode, TryBatchInsert will split
// the key, value list based on the first byte of key and spawn multiple
// goroutines to insert these lists parallel.
func (t *Trie) TryBatchInsert(keys, values [][]byte) error {
t.unhashed += len(keys)

var (
resolvedNode node = t.root
err error
)
if node, ok := t.root.(hashNode); ok {
resolvedNode, err = t.resolveHash(node, nil)
if err != nil {
return err
}
}

if fnode, ok := resolvedNode.(*fullNode); ok {
type insertTask struct {
key []byte
value []byte
}

var insertTasks [17][]insertTask
for i := range keys {
k := keybytesToHex(keys[i])
insertTasks[k[0]] = append(insertTasks[k[0]], insertTask{
key: k,
value: values[i],
})
}

var (
wg sync.WaitGroup
returnedNodes [17]node
errors [17]error
)
wg.Add(17)
for i, tasks := range insertTasks {
go func(index int, tasks []insertTask) {
defer wg.Done()

var err error
taskNode := fnode.Children[index]
for _, task := range tasks {
_, taskNode, err = t.insert(taskNode, []byte{byte(index)}, task.key[1:], valueNode(task.value))
if err != nil {
break
}
}

errors[index] = err
returnedNodes[index] = taskNode
}(i, tasks)
}

wg.Wait()
for _, err := range errors {
if err != nil {
return err
}
}
var newFullNode fullNode
copy(newFullNode.Children[:], returnedNodes[:])
newFullNode.flags = t.newFlag()
t.root = &newFullNode
} else {
for i := range keys {
k := keybytesToHex(keys[i])
_, n, err := t.insert(t.root, nil, k, valueNode(values[i]))
if err != nil {
return err
}
t.root = n
}
}

return nil
}

func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) {
if len(key) == 0 {
if v, ok := n.(valueNode); ok {
Expand Down
Loading
Loading