Skip to content

Commit

Permalink
perf: op-node related api improvement
Browse files Browse the repository at this point in the history
This PR is mainly aimed at optimizing the API of the op-node block issuing process to op-geth, mainly divided into two parts:
1. Improvement of prepareWork:
  - Added block and receipt cache after block import
  - Improved the getBlockByHash api to reduce repetitive loops
2. Improvement of newPayload:
  - The state was saved in advance after sealingBlock. When op-node confirmed the block, the efficiency of writing the block was improved.

In summary, this PR optimizes the API interaction between op-node and op-geth in the block issuance process, mainly including:
1. Caching blocks and receipts after importing blocks to avoid repetitive queries
2. Optimizing the getBlockByHash API to reduce repetitive loops
3. Saving the state in advance after sealing the block so that the efficiency of writing the block can be improved after op-node confirms the block.

---------

Co-authored-by: j75689 <[email protected]>
  • Loading branch information
realowen and j75689 committed May 31, 2023
1 parent f80e72b commit 0d9dc40
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 97 deletions.
166 changes: 114 additions & 52 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,16 @@ var (
)

const (
bodyCacheLimit = 256
blockCacheLimit = 256
receiptsCacheLimit = 32
bodyCacheLimit = 512
blockCacheLimit = 512
receiptsCacheLimit = 512
txLogsCacheLimit = 512
txLookupCacheLimit = 1024
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
TriesInMemory = 128 // default number of recent trie roots to keep in memory

miningStateCacheLimit = 128
TriesInMemory = 128 // default number of recent trie roots to keep in memory

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
//
Expand Down Expand Up @@ -210,6 +213,11 @@ type BlockChain struct {
bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue]
receiptsCache *lru.Cache[common.Hash, []*types.Receipt]
blockCache *lru.Cache[common.Hash, *types.Block]

miningReceiptsCache *lru.Cache[common.Hash, []*types.Receipt]
miningTxLogsCache *lru.Cache[common.Hash, []*types.Log]
miningStateCache *lru.Cache[common.Hash, *state.StateDB]

txLookupCache *lru.Cache[common.Hash, *rawdb.LegacyTxLookupEntry]

// future blocks are blocks added for later processing
Expand Down Expand Up @@ -266,25 +274,29 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
}

bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triedb: triedb,
flushInterval: int64(cacheConfig.TrieTimeLimit),
triegc: prque.New[int64, common.Hash](nil),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, *rawdb.LegacyTxLookupEntry](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
engine: engine,
vmConfig: vmConfig,
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triedb: triedb,
flushInterval: int64(cacheConfig.TrieTimeLimit),
triegc: prque.New[int64, common.Hash](nil),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
miningReceiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
miningTxLogsCache: lru.NewCache[common.Hash, []*types.Log](txLogsCacheLimit),
miningStateCache: lru.NewCache[common.Hash, *state.StateDB](miningStateCacheLimit),
txLookupCache: lru.NewCache[common.Hash, *rawdb.LegacyTxLookupEntry](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
engine: engine,
vmConfig: vmConfig,
}
bc.forker = NewForkChoice(bc, shouldPreserve)
bc.stateCache = state.NewDatabaseWithNodeDBAndCache(bc.db, bc.triedb)

bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = NewStatePrefetcher(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)
Expand Down Expand Up @@ -772,6 +784,9 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
bc.bodyRLPCache.Purge()
bc.receiptsCache.Purge()
bc.blockCache.Purge()
bc.miningReceiptsCache.Purge()
bc.miningTxLogsCache.Purge()
bc.miningStateCache.Purge()
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()

Expand Down Expand Up @@ -1034,6 +1049,33 @@ func (bc *BlockChain) procFutureBlocks() {
}
}

// CacheBlock cache block in memory
func (bc *BlockChain) CacheBlock(hash common.Hash, block *types.Block) {
bc.hc.numberCache.Add(hash, block.NumberU64())
bc.hc.headerCache.Add(hash, block.Header())
bc.blockCache.Add(hash, block)
}

// CacheReceipts cache receipts in memory
func (bc *BlockChain) CacheReceipts(hash common.Hash, receipts types.Receipts) {
bc.receiptsCache.Add(hash, receipts)
}

// CacheMiningReceipts cache receipts in memory
func (bc *BlockChain) CacheMiningReceipts(hash common.Hash, receipts types.Receipts) {
bc.miningReceiptsCache.Add(hash, receipts)
}

// CacheMiningTxLogs cache tx logs in memory
func (bc *BlockChain) CacheMiningTxLogs(hash common.Hash, logs []*types.Log) {
bc.miningTxLogsCache.Add(hash, logs)
}

// CacheMiningState cache mining state in memory
func (bc *BlockChain) CacheMiningState(hash common.Hash, state *state.StateDB) {
bc.miningStateCache.Add(hash, state)
}

// WriteStatus status of write
type WriteStatus byte

Expand Down Expand Up @@ -1765,46 +1807,58 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
continue
}

// Retrieve the parent block and it's state to execute on top
start := time.Now()
parent := it.previous()
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.NewWithSharedPool(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return it.index, err
}

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
activeState = statedb
var (
err error
receipts, receiptExist = bc.miningReceiptsCache.Get(block.Hash())
logs, logExist = bc.miningTxLogsCache.Get(block.Hash())
statedb, stateExist = bc.miningStateCache.Get(block.Hash())
usedGas = block.GasUsed()
start = time.Now()
)

// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
interruptCh := make(chan struct{})
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
pstart := time.Now()

go func(start time.Time, followup *types.Block, throwaway *state.StateDB) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, interruptCh)
// skip block process if we already have the state, receipts and logs from mining work
if !(receiptExist && logExist && stateExist) {
// Retrieve the parent block and it's state to execute on top
parent := it.previous()
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err = state.NewWithSharedPool(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return it.index, err
}

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
activeState = statedb

// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.

blockPrefetchExecuteTimer.Update(time.Since(start))
}(time.Now(), followup, throwaway)
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)

go func(start time.Time, followup *types.Block, throwaway *state.StateDB) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, interruptCh)

blockPrefetchExecuteTimer.Update(time.Since(start))
}(time.Now(), followup, throwaway)
}
}
// Process block using the parent state as reference point
receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
close(interruptCh)
return it.index, err
}
}

// Process block using the parent state as reference point
pstart := time.Now()
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
close(interruptCh)
return it.index, err
}
ptime := time.Since(pstart)

vstart := time.Now()
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
Expand All @@ -1814,6 +1868,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
vtime := time.Since(vstart)
proctime := time.Since(start) // processing + validation

// pre-cache the block and receipts, so that it can be retrieved quickly by rcp
bc.CacheBlock(block.Hash(), block)
err = types.Receipts(receipts).DeriveFields(bc.chainConfig, block.Hash(), block.NumberU64(), block.Time(), block.BaseFee(), block.Transactions())
if err != nil {
log.Warn("Failed to derive receipt fields", "block", block.Hash(), "err", err)
}
bc.CacheReceipts(block.Hash(), receipts)

// Update the metrics touched during block processing and validation
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing)
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing)
Expand Down
1 change: 1 addition & 0 deletions ethclient/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (ec *Client) getBlock(ctx context.Context, method string, args ...interface
if err := json.Unmarshal(raw, &body); err != nil {
return nil, err
}

// Quick-verify transaction and uncle lists. This mostly helps with debugging the server.
if head.UncleHash == types.EmptyUncleHash && len(body.UncleHashes) > 0 {
return nil, fmt.Errorf("server returned non-empty uncle list but block header indicates no uncles")
Expand Down
38 changes: 18 additions & 20 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,19 +1355,19 @@ func RPCMarshalBlock(ctx context.Context, block *types.Block, inclTx bool, fullT
fields["size"] = hexutil.Uint64(block.Size())

if inclTx {
formatTx := func(tx *types.Transaction) (interface{}, error) {
formatTx := func(tx *types.Transaction, index int) (interface{}, error) {
return tx.Hash(), nil
}
if fullTx {
formatTx = func(tx *types.Transaction) (interface{}, error) {
return newRPCTransactionFromBlockHash(ctx, block, tx.Hash(), backend), nil
formatTx = func(tx *types.Transaction, index int) (interface{}, error) {
return newRPCTransactionFromBlockHash(ctx, block, tx.Hash(), index, tx, backend), nil
}
}
txs := block.Transactions()
transactions := make([]interface{}, len(txs))
var err error
for i, tx := range txs {
if transactions[i], err = formatTx(tx); err != nil {
if transactions[i], err = formatTx(tx, i); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -1512,12 +1512,15 @@ func NewRPCPendingTransaction(tx *types.Transaction, current *types.Header, conf
}

// newRPCTransactionFromBlockIndex returns a transaction that will serialize to the RPC representation.
func newRPCTransactionFromBlockIndex(ctx context.Context, b *types.Block, index uint64, backend Backend) *RPCTransaction {
txs := b.Transactions()
if index >= uint64(len(txs)) {
return nil
func newRPCTransactionFromBlockIndex(ctx context.Context, b *types.Block, index uint64, tx *types.Transaction, backend Backend) *RPCTransaction {
if tx == nil {
txs := b.Transactions()
if index >= uint64(len(txs)) {
return nil
}
tx = txs[index]
}
tx := txs[index]

rcpt := depositTxReceipt(ctx, b.Hash(), index, backend, tx)
return newRPCTransaction(tx, b.Hash(), b.NumberU64(), index, b.BaseFee(), backend.ChainConfig(), rcpt)
}
Expand Down Expand Up @@ -1547,13 +1550,8 @@ func newRPCRawTransactionFromBlockIndex(b *types.Block, index uint64) hexutil.By
}

// newRPCTransactionFromBlockHash returns a transaction that will serialize to the RPC representation.
func newRPCTransactionFromBlockHash(ctx context.Context, b *types.Block, hash common.Hash, backend Backend) *RPCTransaction {
for idx, tx := range b.Transactions() {
if tx.Hash() == hash {
return newRPCTransactionFromBlockIndex(ctx, b, uint64(idx), backend)
}
}
return nil
func newRPCTransactionFromBlockHash(ctx context.Context, b *types.Block, hash common.Hash, idx int, tx *types.Transaction, backend Backend) *RPCTransaction {
return newRPCTransactionFromBlockIndex(ctx, b, uint64(idx), tx, backend)
}

// accessListResult returns an optional accesslist
Expand Down Expand Up @@ -1700,15 +1698,15 @@ func (s *TransactionAPI) GetBlockTransactionCountByHash(ctx context.Context, blo
// GetTransactionByBlockNumberAndIndex returns the transaction for the given block number and index.
func (s *TransactionAPI) GetTransactionByBlockNumberAndIndex(ctx context.Context, blockNr rpc.BlockNumber, index hexutil.Uint) *RPCTransaction {
if block, _ := s.b.BlockByNumber(ctx, blockNr); block != nil {
return newRPCTransactionFromBlockIndex(ctx, block, uint64(index), s.b)
return newRPCTransactionFromBlockIndex(ctx, block, uint64(index), nil, s.b)
}
return nil
}

// GetTransactionByBlockHashAndIndex returns the transaction for the given block hash and index.
func (s *TransactionAPI) GetTransactionByBlockHashAndIndex(ctx context.Context, blockHash common.Hash, index hexutil.Uint) *RPCTransaction {
if block, _ := s.b.BlockByHash(ctx, blockHash); block != nil {
return newRPCTransactionFromBlockIndex(ctx, block, uint64(index), s.b)
return newRPCTransactionFromBlockIndex(ctx, block, uint64(index), nil, s.b)
}
return nil
}
Expand Down Expand Up @@ -1910,9 +1908,9 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c

if tx.To() == nil {
addr := crypto.CreateAddress(from, tx.Nonce())
log.Info("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value())
log.Debug("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value())
} else {
log.Info("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value())
log.Debug("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value())
}
return tx.Hash(), nil
}
Expand Down
Loading

0 comments on commit 0d9dc40

Please sign in to comment.