diff --git a/node/cn/api.go b/node/cn/api.go index 344908eba..23548a8bb 100644 --- a/node/cn/api.go +++ b/node/cn/api.go @@ -433,10 +433,12 @@ func (api *PrivateDebugAPI) StorageRangeAt(ctx context.Context, blockHash common if block == nil { return StorageRangeResult{}, fmt.Errorf("block %#x not found", blockHash) } - _, _, _, statedb, err := api.cn.stateAtTransaction(block, txIndex, 0) + _, _, _, statedb, release, err := api.cn.stateAtTransaction(block, txIndex, 0) if err != nil { return StorageRangeResult{}, err } + defer release() + st := statedb.StorageTrie(contractAddress) if st == nil { return StorageRangeResult{}, fmt.Errorf("account %x doesn't exist", contractAddress) diff --git a/node/cn/api_backend.go b/node/cn/api_backend.go index 89e7afdd4..1a89ebaca 100644 --- a/node/cn/api_backend.go +++ b/node/cn/api_backend.go @@ -41,6 +41,7 @@ import ( "github.com/kaiachain/kaia/governance" "github.com/kaiachain/kaia/networks/rpc" "github.com/kaiachain/kaia/node/cn/gasprice" + "github.com/kaiachain/kaia/node/cn/tracers" "github.com/kaiachain/kaia/params" "github.com/kaiachain/kaia/reward" "github.com/kaiachain/kaia/storage/database" @@ -397,11 +398,11 @@ func (b *CNAPIBackend) Engine() consensus.Engine { return b.cn.engine } -func (b *CNAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (*state.StateDB, error) { - return b.cn.stateAtBlock(block, reexec, base, checkLive, preferDisk) +func (b *CNAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) { + return b.cn.stateAtBlock(block, reexec, base, readOnly, preferDisk) } -func (b *CNAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (blockchain.Message, vm.BlockContext, vm.TxContext, *state.StateDB, error) { +func (b *CNAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (blockchain.Message, vm.BlockContext, vm.TxContext, *state.StateDB, tracers.StateReleaseFunc, error) { return b.cn.stateAtTransaction(block, txIndex, reexec) } diff --git a/node/cn/state_accessor.go b/node/cn/state_accessor.go index 4b83e57a9..93a0c5ca3 100644 --- a/node/cn/state_accessor.go +++ b/node/cn/state_accessor.go @@ -30,39 +30,59 @@ import ( "github.com/kaiachain/kaia/blockchain/types" "github.com/kaiachain/kaia/blockchain/vm" "github.com/kaiachain/kaia/common" + "github.com/kaiachain/kaia/node/cn/tracers" "github.com/kaiachain/kaia/reward" statedb2 "github.com/kaiachain/kaia/storage/statedb" ) +// noopReleaser is returned in case there is no operation expected +// for releasing state. +var noopReleaser = tracers.StateReleaseFunc(func() {}) + // stateAtBlock retrieves the state database associated with a certain block. // If no state is locally available for the given block, a number of blocks // are attempted to be reexecuted to generate the desired state. The optional -// base layer statedb can be passed then it's regarded as the statedb of the +// base layer statedb can be provided which is regarded as the statedb of the // parent block. +// +// An additional release function will be returned if the requested state is +// available. Release is expected to be invoked when the returned state is no longer needed. +// Its purpose is to prevent resource leaking. Though it can be noop in some cases. +// // Parameters: -// - block: The block for which we want the state (== state at the stateRoot of the parent) -// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state -// - base: If the caller is tracing multiple blocks, the caller can provide the parent state -// continuously from the callsite. -// - checklive: if true, then the live 'blockchain' state database is used. If the caller want to -// perform Commit or other 'save-to-disk' changes, this should be set to false to avoid -// storing trash persistently -// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is provided, -// it would be preferrable to start from a fresh state, if we have it on disk. -func (cn *CN) stateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) { +// - block: The block for which we want the state(state = block.Root) +// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state +// - base: If the caller is tracing multiple blocks, the caller can provide the parent +// state continuously from the callsite. +// - readOnly: If true, then the live 'blockchain' state database is used. No mutation should +// be made from caller, e.g. perform Commit or other 'save-to-disk' changes. +// Otherwise, the trash generated by caller may be persisted permanently. +// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is +// provided, it would be preferable to start from a fresh state, if we have it +// on disk. +func (cn *CN) stateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (statedb *state.StateDB, release tracers.StateReleaseFunc, err error) { var ( current *types.Block database state.Database report = true origin = block.NumberU64() ) - // Check the live database first if we have the state fully available, use that. - if checkLive { - statedb, err = cn.blockchain.StateAt(block.Root()) - if err == nil { - return statedb, nil + // The state is only for reading purposes, check the state presence in + // live database. + if readOnly { + // The state is available in live database, create a reference + // on top to prevent garbage collection and return a release + // function to deref it. + if statedb, err = cn.blockchain.StateAt(block.Root()); err == nil { + statedb.Database().TrieDB().ReferenceRoot(block.Root()) + return statedb, func() { + statedb.Database().TrieDB().Dereference(block.Root()) + }, nil } } + // The state is both for reading and writing, or it's unavailable in disk, + // try to construct/recover the state over an ephemeral trie.Database for + // isolating the live one. if base != nil { if preferDisk { // Create an ephemeral trie.Database for isolating the live one. Otherwise @@ -70,27 +90,37 @@ func (cn *CN) stateAtBlock(block *types.Block, reexec uint64, base *state.StateD database = state.NewDatabaseWithExistingCache(cn.ChainDB(), cn.blockchain.StateCache().TrieDB().TrieNodeCache()) if statedb, err = state.New(block.Root(), database, nil, nil); err == nil { logger.Info("Found disk backend for state trie", "root", block.Root(), "number", block.Number()) - return statedb, nil + return statedb, noopReleaser, nil } } // The optional base statedb is given, mark the start point as parent block statedb, database, report = base, base.Database(), false current = cn.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) } else { - // Otherwise try to reexec blocks until we find a state or reach our limit + // Otherwise, try to reexec blocks until we find a state or reach our limit current = block // Create an ephemeral trie.Database for isolating the live one. Otherwise // the internal junks created by tracing will be persisted into the disk. database = state.NewDatabaseWithExistingCache(cn.ChainDB(), cn.blockchain.StateCache().TrieDB().TrieNodeCache()) + // If we didn't check the live database, do check state over ephemeral database, + // otherwise we would rewind past a persisted block (specific corner case is + // chain tracing from the genesis). + if !readOnly { + statedb, err = state.New(current.Root(), database, nil, nil) + if err == nil { + return statedb, noopReleaser, nil + } + } + // Database does not have the state for the given block, try to regenerate for i := uint64(0); i < reexec; i++ { if current.NumberU64() == 0 { - return nil, errors.New("genesis state is missing") + return nil, nil, errors.New("genesis state is missing") } parent := cn.blockchain.GetBlock(current.ParentHash(), current.NumberU64()-1) if parent == nil { - return nil, fmt.Errorf("missing block %v %d", current.ParentHash(), current.NumberU64()-1) + return nil, nil, fmt.Errorf("missing block %v %d", current.ParentHash(), current.NumberU64()-1) } current = parent @@ -102,13 +132,14 @@ func (cn *CN) stateAtBlock(block *types.Block, reexec uint64, base *state.StateD if err != nil { switch err.(type) { case *statedb2.MissingNodeError: - return nil, fmt.Errorf("historical state unavailable. tried regeneration but not possible, possibly due to state migration/pruning or global state saving interval is bigger than reexec value (reexec=%d)", reexec) + return nil, nil, fmt.Errorf("historical state unavailable. tried regeneration but not possible, possibly due to state migration/pruning or global state saving interval is bigger than reexec value (reexec=%d)", reexec) default: - return nil, err + return nil, nil, err } } } - // State was available at historical point, regenerate + // State is available at historical point, re-execute the blocks on top for + // the desired state. var ( start = time.Now() logged time.Time @@ -129,29 +160,30 @@ func (cn *CN) stateAtBlock(block *types.Block, reexec uint64, base *state.StateD } // Quit the state regeneration if time limit exceeds if cn.config.DisableUnsafeDebug && time.Since(start) > cn.config.StateRegenerationTimeLimit { - return nil, fmt.Errorf("this request has queried old states too long since it exceeds the state regeneration time limit(%s)", cn.config.StateRegenerationTimeLimit.String()) + return nil, nil, fmt.Errorf("this request has queried old states too long since it exceeds the state regeneration time limit(%s)", cn.config.StateRegenerationTimeLimit.String()) } // Preload StakingInfo from the current block and state. Needed for next block's engine.Finalize() post-Kaia. preloadedStakingBlockNums = append(preloadedStakingBlockNums, current.NumberU64()) if err := reward.PreloadStakingInfoWithState(current.Header(), statedb); err != nil { - return nil, fmt.Errorf("preloading staking info from block %d failed: %v", current.NumberU64(), err) + return nil, nil, fmt.Errorf("preloading staking info from block %d failed: %v", current.NumberU64(), err) } // Retrieve the next block to regenerate and process it next := current.NumberU64() + 1 if current = cn.blockchain.GetBlockByNumber(next); current == nil { - return nil, fmt.Errorf("block #%d not found", next) + return nil, nil, fmt.Errorf("block #%d not found", next) } _, _, _, _, _, err := cn.blockchain.Processor().Process(current, statedb, vm.Config{}) if err != nil { - return nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err) + return nil, nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err) } // Finalize the state so any modifications are written to the trie root, err := statedb.Commit(true) if err != nil { - return nil, err + return nil, nil, err } - if err := statedb.Reset(root); err != nil { - return nil, fmt.Errorf("state reset after block %d failed: %v", current.NumberU64(), err) + statedb, err = state.New(root, database, nil, nil) + if err != nil { + return nil, nil, fmt.Errorf("state reset after block %d failed: %v", current.NumberU64(), err) } database.TrieDB().ReferenceRoot(root) if !common.EmptyHash(parent) { @@ -160,7 +192,7 @@ func (cn *CN) stateAtBlock(block *types.Block, reexec uint64, base *state.StateD err = fmt.Errorf("mistmatching state root block expected %x reexecuted %x", current.Header().Root, root) // Logging here because something went wrong when the state roots disagree even if the execution was successful. logger.Error("incorrectly regenerated historical state", "block", current.NumberU64(), "err", err) - return nil, fmt.Errorf("incorrectly regenerated historical state for block %d: %v", current.NumberU64(), err) + return nil, nil, fmt.Errorf("incorrectly regenerated historical state for block %d: %v", current.NumberU64(), err) } } parent = root @@ -170,28 +202,28 @@ func (cn *CN) stateAtBlock(block *types.Block, reexec uint64, base *state.StateD logger.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs) } - return statedb, nil + return statedb, func() { database.TrieDB().Dereference(block.Root()) }, nil } // stateAtTransaction returns the execution environment of a certain transaction. -func (cn *CN) stateAtTransaction(block *types.Block, txIndex int, reexec uint64) (blockchain.Message, vm.BlockContext, vm.TxContext, *state.StateDB, error) { +func (cn *CN) stateAtTransaction(block *types.Block, txIndex int, reexec uint64) (blockchain.Message, vm.BlockContext, vm.TxContext, *state.StateDB, tracers.StateReleaseFunc, error) { // Short circuit if it's genesis block. if block.NumberU64() == 0 { - return nil, vm.BlockContext{}, vm.TxContext{}, nil, errors.New("no transaction in genesis") + return nil, vm.BlockContext{}, vm.TxContext{}, nil, nil, errors.New("no transaction in genesis") } // Create the parent state database parent := cn.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) if parent == nil { - return nil, vm.BlockContext{}, vm.TxContext{}, nil, fmt.Errorf("parent %#x not found", block.ParentHash()) + return nil, vm.BlockContext{}, vm.TxContext{}, nil, nil, fmt.Errorf("parent %#x not found", block.ParentHash()) } // Lookup the statedb of parent block from the live database, // otherwise regenerate it on the flight. - statedb, err := cn.stateAtBlock(parent, reexec, nil, true, false) + statedb, release, err := cn.stateAtBlock(parent, reexec, nil, true, false) if err != nil { - return nil, vm.BlockContext{}, vm.TxContext{}, nil, err + return nil, vm.BlockContext{}, vm.TxContext{}, nil, nil, err } if txIndex == 0 && len(block.Transactions()) == 0 { - return nil, vm.BlockContext{}, vm.TxContext{}, statedb, nil + return nil, vm.BlockContext{}, vm.TxContext{}, statedb, release, nil } // Recompute transactions up to the target index. signer := types.MakeSigner(cn.blockchain.Config(), block.Number()) @@ -200,22 +232,22 @@ func (cn *CN) stateAtTransaction(block *types.Block, txIndex int, reexec uint64) msg, err := tx.AsMessageWithAccountKeyPicker(signer, statedb, block.NumberU64()) if err != nil { logger.Warn("stateAtTransition failed", "hash", tx.Hash(), "block", block.NumberU64(), "err", err) - return nil, vm.BlockContext{}, vm.TxContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) + return nil, vm.BlockContext{}, vm.TxContext{}, nil, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) } txContext := blockchain.NewEVMTxContext(msg, block.Header(), cn.chainConfig) blockContext := blockchain.NewEVMBlockContext(block.Header(), cn.blockchain, nil) if idx == txIndex { - return msg, blockContext, txContext, statedb, nil + return msg, blockContext, txContext, statedb, release, nil } // Not yet the searched for transaction, execute on top of the current state vmenv := vm.NewEVM(blockContext, txContext, statedb, cn.blockchain.Config(), &vm.Config{}) if _, err := blockchain.ApplyMessage(vmenv, msg); err != nil { - return nil, vm.BlockContext{}, vm.TxContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) + return nil, vm.BlockContext{}, vm.TxContext{}, nil, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) } // Ensure any modifications are committed to the state // Since Kaia is forked after EIP158/161 (a.k.a Spurious Dragon), deleting empty object is always effective statedb.Finalise(true, true) } - return nil, vm.BlockContext{}, vm.TxContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) + return nil, vm.BlockContext{}, vm.TxContext{}, nil, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) } diff --git a/node/cn/tracers/api.go b/node/cn/tracers/api.go index f6cd7b3dc..d5ccf7643 100644 --- a/node/cn/tracers/api.go +++ b/node/cn/tracers/api.go @@ -80,6 +80,10 @@ var ( heavyAPIRequestCount int32 = 0 ) +// StateReleaseFunc is used to deallocate resources held by constructing a +// historical state for tracing purposes. +type StateReleaseFunc func() + // Backend interface provides the common API services with access to necessary functions. type Backend interface { HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) @@ -94,8 +98,8 @@ type Backend interface { // StateAtBlock returns the state corresponding to the stateroot of the block. // N.B: For executing transactions on block N, the required stateRoot is block N-1, // so this method should be called with the parent. - StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (*state.StateDB, error) - StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (blockchain.Message, vm.BlockContext, vm.TxContext, *state.StateDB, error) + StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, StateReleaseFunc, error) + StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (blockchain.Message, vm.BlockContext, vm.TxContext, *state.StateDB, StateReleaseFunc, error) } // CommonAPI contains @@ -214,7 +218,7 @@ type txTraceResult struct { type blockTraceTask struct { statedb *state.StateDB // Intermediate state prepped for tracing block *types.Block // Block to trace the transactions from - rootref common.Hash // Trie root reference held for this task + release StateReleaseFunc // The function to release the held resource for this task results []*txTraceResult // Trace results procudes by the task } @@ -274,8 +278,36 @@ func (api *UnsafeAPI) TraceChain(ctx context.Context, start, end rpc.BlockNumber return sub, err } +// releaser is a helper tool responsible for caching the release +// callbacks of tracing state. +type releaser struct { + releases []StateReleaseFunc + lock sync.Mutex +} + +func (r *releaser) add(release StateReleaseFunc) { + r.lock.Lock() + defer r.lock.Unlock() + + r.releases = append(r.releases, release) +} + +func (r *releaser) call() { + r.lock.Lock() + defer r.lock.Unlock() + + for _, release := range r.releases { + release() + } + r.releases = r.releases[:0] +} + // traceChain configures a new tracer according to the provided configuration, and -// executes all the transactions contained within. +// executes all the transactions contained within. The tracing chain range includes +// the end block but excludes the start one. The return value will be one item per +// transaction, dependent on the requested tracer. +// The tracing procedure should be aborted in case the closed signal is received. +// // The traceChain operates in two modes: subscription mode and rpc mode // - if notifier and sub is not nil, it works as a subscription mode and returns nothing // - if those parameters are nil, it works as a rpc mode and returns the block trace results, so it can pass the result through rpc-call @@ -297,15 +329,17 @@ func (api *CommonAPI) traceChain(start, end *types.Block, config *TraceConfig, n tasks = make(chan *blockTraceTask, threads) results = make(chan *blockTraceTask, threads) localctx = context.Background() + reler = new(releaser) ) for th := 0; th < threads; th++ { pend.Add(1) go func() { defer pend.Done() - // Fetch and execute the next block trace tasks + // Fetch and execute the block trace tasks for task := range tasks { signer := types.MakeSigner(api.backend.ChainConfig(), task.block.Number()) + blockCtx := blockchain.NewEVMBlockContext(task.block.Header(), newChainContext(localctx, api.backend), nil) // Trace all the transactions contained within for i, tx := range task.block.Transactions() { @@ -317,7 +351,6 @@ func (api *CommonAPI) traceChain(start, end *types.Block, config *TraceConfig, n } txCtx := blockchain.NewEVMTxContext(msg, task.block.Header(), api.backend.ChainConfig()) - blockCtx := blockchain.NewEVMBlockContext(task.block.Header(), newChainContext(localctx, api.backend), nil) res, err := api.traceTx(localctx, msg, blockCtx, txCtx, task.statedb, config) if err != nil { @@ -328,6 +361,10 @@ func (api *CommonAPI) traceChain(start, end *types.Block, config *TraceConfig, n task.statedb.Finalise(true, true) task.results[i] = &txTraceResult{TxHash: tx.Hash(), Result: res} } + // Tracing state is used up, queue it for de-referencing + reler.add(task.release) + + // Stream the result back to the result catcher or abort on teardown if notifier != nil { // Stream the result back to the user or abort on teardown select { @@ -342,22 +379,26 @@ func (api *CommonAPI) traceChain(start, end *types.Block, config *TraceConfig, n }() } // Start a goroutine to feed all the blocks into the tracers - begin := time.Now() go func() { var ( logged time.Time + begin = time.Now() number uint64 traced uint64 failed error - parent common.Hash statedb *state.StateDB + release StateReleaseFunc ) // Ensure everything is properly cleaned up on any exit path defer func() { close(tasks) pend.Wait() + // Clean out any pending derefs. + reler.call() + + // Log the chain result switch { case failed != nil: logger.Warn("Chain tracing failed", "start", start.NumberU64(), "end", end.NumberU64(), "transactions", traced, "elapsed", time.Since(begin), "err", failed) @@ -368,7 +409,7 @@ func (api *CommonAPI) traceChain(start, end *types.Block, config *TraceConfig, n } close(results) }() - var preferDisk bool + // Feed all the blocks both into the tracer, as well as fast process concurrently for number = start.NumberU64(); number < end.NumberU64(); number++ { if notifier != nil { @@ -384,51 +425,48 @@ func (api *CommonAPI) traceChain(start, end *types.Block, config *TraceConfig, n logged = time.Now() logger.Info("Tracing chain segment", "start", start.NumberU64(), "end", end.NumberU64(), "current", number, "transactions", traced, "elapsed", time.Since(begin)) } - // Retrieve the parent state to trace on top + // Retrieve the parent block and target block for tracing. block, err := api.blockByNumber(localctx, rpc.BlockNumber(number)) if err != nil { failed = err break } - // Prepare the statedb for tracing. Don't use the live database for - // tracing to avoid persisting state junks into the database. - statedb, err = api.backend.StateAtBlock(localctx, block, reexec, statedb, false, preferDisk) + next, err := api.blockByNumber(localctx, rpc.BlockNumber(number+1)) if err != nil { failed = err break } - if trieDb := statedb.Database().TrieDB(); trieDb != nil { - // Hold the reference for tracer, will be released at the final stage - trieDb.ReferenceRoot(block.Root()) - - // Release the parent state because it's already held by the tracer - if !common.EmptyHash(parent) { - trieDb.Dereference(parent) - } - // Prefer disk if the trie db memory grows too much - s1, s2, s3 := trieDb.Size() - if !preferDisk && (s1+s2+s3) > defaultTracechainMemLimit { - logger.Info("Switching to prefer-disk mode for tracing", "size", s1+s2+s3) - preferDisk = true - } + // Prepare the statedb for tracing. Don't use the live database for + // tracing to avoid persisting state junks into the database. Switch + // over to `preferDisk` mode only if the memory usage exceeds the + // limit, the trie database will be reconstructed from scratch only + // if the relevant state is available in disk. + var preferDisk bool + if statedb != nil { + s1, s2, s3 := statedb.Database().TrieDB().Size() + preferDisk = s1+s2+s3 > defaultTracechainMemLimit } - parent = block.Root() - - next, err := api.blockByNumber(localctx, rpc.BlockNumber(number+1)) + statedb, release, err = api.backend.StateAtBlock(localctx, block, reexec, statedb, false, preferDisk) if err != nil { failed = err break } + // Clean out any pending derefs. Note this step must be done after + // constructing tracing state, because the tracing state of block + // next depends on the parent state and construction may fail if + // we release too early. + reler.call() + // Send the block over to the concurrent tracers (if not in the fast-forward phase) txs := next.Transactions() if notifier != nil { select { - case tasks <- &blockTraceTask{statedb: statedb.Copy(), block: next, rootref: block.Root(), results: make([]*txTraceResult, len(txs))}: + case tasks <- &blockTraceTask{statedb: statedb.Copy(), block: next, release: release, results: make([]*txTraceResult, len(txs))}: case <-notifier.Closed(): return } } else { - tasks <- &blockTraceTask{statedb: statedb.Copy(), block: next, rootref: block.Root(), results: make([]*txTraceResult, len(txs))} + tasks <- &blockTraceTask{statedb: statedb.Copy(), block: next, release: release, results: make([]*txTraceResult, len(txs))} } traced += uint64(len(txs)) } @@ -449,10 +487,6 @@ func (api *CommonAPI) traceChain(start, end *types.Block, config *TraceConfig, n } done[uint64(result.Block)] = result - // Dereference any parent tries held in memory by this task - if res.statedb.Database().TrieDB() != nil { - res.statedb.Database().TrieDB().Dereference(res.rootref) - } if notifier != nil { // Stream completed traces to the user, aborting on the first error for result, ok := done[next]; ok; result, ok = done[next] { @@ -472,6 +506,7 @@ func (api *CommonAPI) traceChain(start, end *types.Block, config *TraceConfig, n } if notifier != nil { + // Keep reading the trace results and stream them to result channel. go waitForResult() return nil, nil } @@ -598,10 +633,12 @@ func (api *CommonAPI) traceBlock(ctx context.Context, block *types.Block, config reexec = *config.Reexec } - statedb, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) + statedb, release, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) if err != nil { return nil, err } + defer release() + // Execute all the transaction contained within the block concurrently var ( signer = types.MakeSigner(api.backend.ChainConfig(), block.Number()) @@ -695,10 +732,12 @@ func (api *CommonAPI) standardTraceBlockToFile(ctx context.Context, block *types if config != nil && config.Reexec != nil { reexec = *config.Reexec } - statedb, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) + statedb, release, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) if err != nil { return nil, err } + defer release() + // Retrieve the tracing configurations, or use default values var ( logConfig vm.LogConfig @@ -811,10 +850,12 @@ func (api *CommonAPI) TraceTransaction(ctx context.Context, hash common.Hash, co if err != nil { return nil, err } - msg, blockCtx, txCtx, statedb, err := api.backend.StateAtTransaction(ctx, block, int(index), reexec) + msg, blockCtx, txCtx, statedb, release, err := api.backend.StateAtTransaction(ctx, block, int(index), reexec) if err != nil { return nil, err } + defer release() + // Trace the transaction and return return api.traceTx(ctx, msg, blockCtx, txCtx, statedb, config) } @@ -850,10 +891,11 @@ func (api *CommonAPI) TraceCall(ctx context.Context, args kaiaapi.CallArgs, bloc if config != nil && config.Reexec != nil { reexec = *config.Reexec } - statedb, err := api.backend.StateAtBlock(ctx, block, reexec, nil, true, false) + statedb, release, err := api.backend.StateAtBlock(ctx, block, reexec, nil, true, false) if err != nil { return nil, err } + defer release() // Execute the trace intrinsicGas, err := types.IntrinsicGas(args.InputData(), nil, args.To == nil, api.backend.ChainConfig().Rules(block.Number())) diff --git a/node/cn/tracers/api_test.go b/node/cn/tracers/api_test.go index 290f6537f..0a808ddde 100644 --- a/node/cn/tracers/api_test.go +++ b/node/cn/tracers/api_test.go @@ -23,11 +23,13 @@ import ( "bytes" "context" "crypto/ecdsa" + "encoding/json" "errors" "fmt" "math/big" "reflect" "sort" + "sync/atomic" "testing" kaiaapi "github.com/kaiachain/kaia/api" @@ -58,6 +60,9 @@ type testBackend struct { engine consensus.Engine chaindb database.DBManager chain *blockchain.BlockChain + + refHook func() // Hook is invoked when the requested state is referenced + relHook func() // Hook is invoked when the requested state is released } func newTestBackend(t *testing.T, n int, gspec *blockchain.Genesis, generator func(i int, b *blockchain.BlockGen)) *testBackend { @@ -144,25 +149,33 @@ func (b *testBackend) ChainDB() database.DBManager { return b.chaindb } -func (b *testBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (*state.StateDB, error) { +func (b *testBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, StateReleaseFunc, error) { statedb, err := b.chain.StateAt(block.Root()) if err != nil { - return nil, errStateNotFound + return nil, nil, errStateNotFound + } + if b.refHook != nil { + b.refHook() + } + release := func() { + if b.relHook != nil { + b.relHook() + } } - return statedb, nil + return statedb, release, nil } -func (b *testBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (blockchain.Message, vm.BlockContext, vm.TxContext, *state.StateDB, error) { +func (b *testBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (blockchain.Message, vm.BlockContext, vm.TxContext, *state.StateDB, StateReleaseFunc, error) { parent := b.chain.GetBlock(block.ParentHash(), block.NumberU64()-1) if parent == nil { - return nil, vm.BlockContext{}, vm.TxContext{}, nil, errBlockNotFound + return nil, vm.BlockContext{}, vm.TxContext{}, nil, nil, errBlockNotFound } - statedb, err := b.chain.StateAt(parent.Root()) + statedb, release, err := b.StateAtBlock(ctx, parent, reexec, nil, true, false) if err != nil { - return nil, vm.BlockContext{}, vm.TxContext{}, nil, errStateNotFound + return nil, vm.BlockContext{}, vm.TxContext{}, nil, nil, errStateNotFound } if txIndex == 0 && len(block.Transactions()) == 0 { - return nil, vm.BlockContext{}, vm.TxContext{}, statedb, nil + return nil, vm.BlockContext{}, vm.TxContext{}, statedb, release, nil } // Recompute transactions up to the target index. signer := types.MakeSigner(b.chainConfig, block.Number()) @@ -171,15 +184,73 @@ func (b *testBackend) StateAtTransaction(ctx context.Context, block *types.Block txContext := blockchain.NewEVMTxContext(msg, block.Header(), b.chainConfig) blockContext := blockchain.NewEVMBlockContext(block.Header(), b.chain, nil) if idx == txIndex { - return msg, blockContext, txContext, statedb, nil + return msg, blockContext, txContext, statedb, release, nil } vmenv := vm.NewEVM(blockContext, txContext, statedb, b.chainConfig, &vm.Config{Debug: true, EnableInternalTxTracing: true}) if _, err := blockchain.ApplyMessage(vmenv, msg); err != nil { - return nil, vm.BlockContext{}, vm.TxContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) + return nil, vm.BlockContext{}, vm.TxContext{}, nil, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) } statedb.Finalise(true, true) } - return nil, vm.BlockContext{}, vm.TxContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) + return nil, vm.BlockContext{}, vm.TxContext{}, nil, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) +} + +func TestTraceChain(t *testing.T) { + // Initialize test accounts + accounts := newAccounts(3) + genesis := &blockchain.Genesis{Alloc: blockchain.GenesisAlloc{ + accounts[0].addr: {Balance: big.NewInt(params.KAIA)}, + accounts[1].addr: {Balance: big.NewInt(params.KAIA)}, + accounts[2].addr: {Balance: big.NewInt(params.KAIA)}, + }} + genBlocks := 50 + signer := types.LatestSignerForChainID(params.TestChainConfig.ChainID) + + var ( + ref uint32 // total refs has made + rel uint32 // total rels has made + nonce uint64 + ) + backend := newTestBackend(t, genBlocks, genesis, func(i int, b *blockchain.BlockGen) { + // Transfer from account[0] to account[1] + // value: 1000 wei + // fee: 0 wei + for j := 0; j < i+1; j++ { + tx, _ := types.SignTx(types.NewTransaction(nonce, accounts[1].addr, big.NewInt(1000), params.TxGas, big.NewInt(0), nil), signer, accounts[0].key) + b.AddTx(tx) + nonce += 1 + } + }) + backend.refHook = func() { atomic.AddUint32(&ref, 1) } + backend.relHook = func() { atomic.AddUint32(&rel, 1) } + api := NewAPI(backend) + + single := `{"gas":21000,"failed":false,"returnValue":"","structLogs":[]}` + cases := []struct { + start uint64 + end uint64 + config *TraceConfig + }{ + {0, 50, nil}, // the entire chain range, blocks [1, 50] + {10, 20, nil}, // the middle chain range, blocks [11, 20] + } + for _, c := range cases { + ref, rel = 0, 0 // clean up the counters + + from, _ := api.blockByNumber(context.Background(), rpc.BlockNumber(c.start)) + to, _ := api.blockByNumber(context.Background(), rpc.BlockNumber(c.end)) + ret, err := api.traceChain(from, to, c.config, nil, nil) + assert.NoError(t, err) + + for _, trace := range ret { + for _, txTrace := range trace.Traces { + blob, _ := json.Marshal(txTrace.Result) + if string(blob) != single { + t.Error("Unexpected tracing result") + } + } + } + } } func TestTraceCall(t *testing.T) { diff --git a/tests/state_reexec_test.go b/tests/state_reexec_test.go index 1c243ea3c..7af5f72d4 100644 --- a/tests/state_reexec_test.go +++ b/tests/state_reexec_test.go @@ -159,7 +159,10 @@ func testStateReexec_run(t *testing.T, node *blockchainTestNode, num uint64) { block := node.cn.BlockChain().GetBlockByNumber(num) t.Logf("Regenerating state at block %d", num) - state, err := node.cn.APIBackend.StateAtBlock(context.Background(), block, 10, nil, false, false) + state, release, err := node.cn.APIBackend.StateAtBlock(context.Background(), block, 10, nil, false, false) + if release != nil { + release() + } require.Nil(t, err) // Regenerated state must match the stored block's stateRoot