diff --git a/cmd/geth/main.go b/cmd/geth/main.go index e6a59a52fe..b52cb356da 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -71,6 +71,8 @@ var ( utils.NoUSBFlag, utils.DirectBroadcastFlag, utils.DisableSnapProtocolFlag, + utils.DisableDiffProtocolFlag, + utils.EnableTrustProtocolFlag, utils.DiffSyncFlag, utils.PipeCommitFlag, utils.RangeLimitFlag, @@ -99,6 +101,7 @@ var ( utils.TxPoolLifetimeFlag, utils.TxPoolReannounceTimeFlag, utils.SyncModeFlag, + utils.TriesVerifyModeFlag, utils.ExitWhenSyncedFlag, utils.GCModeFlag, utils.SnapshotFlag, diff --git a/cmd/geth/snapshot.go b/cmd/geth/snapshot.go index ab68c11b4d..0d1d3977f4 100644 --- a/cmd/geth/snapshot.go +++ b/cmd/geth/snapshot.go @@ -30,14 +30,17 @@ import ( "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state/pruner" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" @@ -133,6 +136,32 @@ geth snapshot verify-state will traverse the whole accounts and storages set based on the specified snapshot and recalculate the root hash of state for verification. In other words, this command does the snapshot to trie conversion. +`, + }, + { + Name: "insecure-prune-all", + Usage: "Prune all trie state data except genesis block, it will break storage for fullnode, only suitable for fast node " + + "who do not need trie storage at all", + ArgsUsage: "", + Action: utils.MigrateFlags(pruneAllState), + Category: "MISCELLANEOUS COMMANDS", + Flags: []cli.Flag{ + utils.DataDirFlag, + utils.AncientFlag, + utils.RopstenFlag, + utils.RinkebyFlag, + utils.GoerliFlag, + }, + Description: ` +will prune all historical trie state data except genesis block. +All trie nodes will be deleted from the database. + +It expects the genesis file as argument. + +WARNING: It's necessary to delete the trie clean cache after the pruning. +If you specify another directory for the trie clean cache via "--cache.trie.journal" +during the use of Geth, please also specify it here for correct deletion. Otherwise +the trie clean cache with default directory will be deleted. `, }, { @@ -228,7 +257,7 @@ func accessDb(ctx *cli.Context, stack *node.Node) (ethdb.Database, error) { } headHeader := headBlock.Header() //Make sure the MPT and snapshot matches before pruning, otherwise the node can not start. - snaptree, err := snapshot.New(chaindb, trie.NewDatabase(chaindb), 256, TriesInMemory, headBlock.Root(), false, false, false) + snaptree, err := snapshot.New(chaindb, trie.NewDatabase(chaindb), 256, TriesInMemory, headBlock.Root(), false, false, false, false) if err != nil { log.Error("snaptree error", "err", err) return nil, err // The relevant snapshot(s) might not exist @@ -396,6 +425,48 @@ func pruneState(ctx *cli.Context) error { return nil } +func pruneAllState(ctx *cli.Context) error { + stack, _ := makeConfigNode(ctx) + defer stack.Close() + + genesisPath := ctx.Args().First() + if len(genesisPath) == 0 { + utils.Fatalf("Must supply path to genesis JSON file") + } + file, err := os.Open(genesisPath) + if err != nil { + utils.Fatalf("Failed to read genesis file: %v", err) + } + defer file.Close() + + g := new(core.Genesis) + if err := json.NewDecoder(file).Decode(g); err != nil { + cfg := gethConfig{ + Eth: ethconfig.Defaults, + Node: defaultNodeConfig(), + Metrics: metrics.DefaultConfig, + } + + // Load config file. + if err := loadConfig(genesisPath, &cfg); err != nil { + utils.Fatalf("%v", err) + } + g = cfg.Eth.Genesis + } + + chaindb := utils.MakeChainDatabase(ctx, stack, false, false) + pruner, err := pruner.NewAllPruner(chaindb) + if err != nil { + log.Error("Failed to open snapshot tree", "err", err) + return err + } + if err = pruner.PruneAll(g); err != nil { + log.Error("Failed to prune state", "err", err) + return err + } + return nil +} + func verifyState(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() @@ -406,7 +477,7 @@ func verifyState(ctx *cli.Context) error { log.Error("Failed to load head block") return errors.New("no head block") } - snaptree, err := snapshot.New(chaindb, trie.NewDatabase(chaindb), 256, 128, headBlock.Root(), false, false, false) + snaptree, err := snapshot.New(chaindb, trie.NewDatabase(chaindb), 256, 128, headBlock.Root(), false, false, false, false) if err != nil { log.Error("Failed to open snapshot tree", "err", err) return err @@ -658,7 +729,7 @@ func dumpState(ctx *cli.Context) error { return err } triesInMemory := ctx.GlobalUint64(utils.TriesInMemoryFlag.Name) - snaptree, err := snapshot.New(db, trie.NewDatabase(db), int(triesInMemory), 256, root, false, false, false) + snaptree, err := snapshot.New(db, trie.NewDatabase(db), int(triesInMemory), 256, root, false, false, false, false) if err != nil { return err } diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 7a76d1e321..43e27c5558 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -41,6 +41,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.NoUSBFlag, utils.DirectBroadcastFlag, utils.DisableSnapProtocolFlag, + utils.DisableDiffProtocolFlag, + utils.EnableTrustProtocolFlag, utils.RangeLimitFlag, utils.SmartCardDaemonPathFlag, utils.NetworkIdFlag, @@ -50,6 +52,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.RopstenFlag, utils.SepoliaFlag, utils.SyncModeFlag, + utils.TriesVerifyModeFlag, utils.ExitWhenSyncedFlag, utils.GCModeFlag, utils.TxLookupLimitFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 5fc271e262..130787f2ce 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -124,6 +124,14 @@ var ( Name: "disablesnapprotocol", Usage: "Disable snap protocol", } + DisableDiffProtocolFlag = cli.BoolFlag{ + Name: "disablediffprotocol", + Usage: "Disable diff protocol", + } + EnableTrustProtocolFlag = cli.BoolFlag{ + Name: "enabletrustprotocol", + Usage: "Enable trust protocol", + } DiffSyncFlag = cli.BoolFlag{ Name: "diffsync", Usage: "Enable diffy sync, Please note that enable diffsync will improve the syncing speed, " + @@ -285,6 +293,20 @@ var ( Usage: "The layer of tries trees that keep in memory", Value: 128, } + defaultVerifyMode = ethconfig.Defaults.TriesVerifyMode + TriesVerifyModeFlag = TextMarshalerFlag{ + Name: "tries-verify-mode", + Usage: `tries verify mode: + "local(default): a normal full node with complete state world(both MPT and snapshot), merkle state root will + be verified against the block header.", + "full: a fast node with only snapshot state world. Merkle state root is verified by the trustworthy remote verify node + by comparing the diffhash(an identify of difflayer generated by the block) and state root.", + "insecure: same as full mode, except that it can tolerate without verifying the diffhash when verify node does not have it.", + "none: no merkle state root verification at all, there is no need to setup or connect remote verify node at all, + it is more light comparing to full and insecure mode, but get a very small chance that the state is not consistent + with other peers."`, + Value: &defaultVerifyMode, + } OverrideBerlinFlag = cli.Uint64Flag{ Name: "override.berlin", Usage: "Manually specify Berlin fork-block, overriding the bundled setting", @@ -1646,6 +1668,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.GlobalIsSet(DisableSnapProtocolFlag.Name) { cfg.DisableSnapProtocol = ctx.GlobalBool(DisableSnapProtocolFlag.Name) } + if ctx.GlobalIsSet(DisableDiffProtocolFlag.Name) { + cfg.DisableDiffProtocol = ctx.GlobalIsSet(DisableDiffProtocolFlag.Name) + } + if ctx.GlobalIsSet(EnableTrustProtocolFlag.Name) { + cfg.EnableTrustProtocol = ctx.GlobalIsSet(EnableTrustProtocolFlag.Name) + } if ctx.GlobalIsSet(DiffSyncFlag.Name) { cfg.DiffSync = ctx.GlobalBool(DiffSyncFlag.Name) } @@ -1679,6 +1707,14 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.GlobalIsSet(TriesInMemoryFlag.Name) { cfg.TriesInMemory = ctx.GlobalUint64(TriesInMemoryFlag.Name) } + if ctx.GlobalIsSet(TriesVerifyModeFlag.Name) { + cfg.TriesVerifyMode = *GlobalTextMarshaler(ctx, TriesVerifyModeFlag.Name).(*core.VerifyMode) + // If a node sets verify mode to full or insecure, it's a fast node and need + // to verify blocks from verify nodes, then it should enable trust protocol. + if cfg.TriesVerifyMode.NeedRemoteVerify() { + cfg.EnableTrustProtocol = true + } + } if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheSnapshotFlag.Name) { cfg.SnapshotCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheSnapshotFlag.Name) / 100 } @@ -1714,7 +1750,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { cfg.RPCTxFeeCap = ctx.GlobalFloat64(RPCGlobalTxFeeCapFlag.Name) } if ctx.GlobalIsSet(NoDiscoverFlag.Name) { - cfg.EthDiscoveryURLs, cfg.SnapDiscoveryURLs = []string{}, []string{} + cfg.EthDiscoveryURLs, cfg.SnapDiscoveryURLs, cfg.TrustDiscoveryURLs = []string{}, []string{}, []string{} } else if ctx.GlobalIsSet(DNSDiscoveryFlag.Name) { urls := ctx.GlobalString(DNSDiscoveryFlag.Name) if urls == "" { @@ -1828,6 +1864,7 @@ func SetDNSDiscoveryDefaults(cfg *ethconfig.Config, genesis common.Hash) { if url := params.KnownDNSNetwork(genesis, protocol); url != "" { cfg.EthDiscoveryURLs = []string{url} cfg.SnapDiscoveryURLs = cfg.EthDiscoveryURLs + cfg.TrustDiscoveryURLs = cfg.EthDiscoveryURLs } } diff --git a/core/block_validator.go b/core/block_validator.go index 12bb61d909..e710d7469a 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -30,23 +30,38 @@ import ( const badBlockCacheExpire = 30 * time.Second +type BlockValidatorOption func(*BlockValidator) *BlockValidator + +func EnableRemoteVerifyManager(remoteValidator *remoteVerifyManager) BlockValidatorOption { + return func(bv *BlockValidator) *BlockValidator { + bv.remoteValidator = remoteValidator + return bv + } +} + // BlockValidator is responsible for validating block headers, uncles and // processed state. // // BlockValidator implements Validator. type BlockValidator struct { - config *params.ChainConfig // Chain configuration options - bc *BlockChain // Canonical block chain - engine consensus.Engine // Consensus engine used for validating + config *params.ChainConfig // Chain configuration options + bc *BlockChain // Canonical block chain + engine consensus.Engine // Consensus engine used for validating + remoteValidator *remoteVerifyManager } // NewBlockValidator returns a new block validator which is safe for re-use -func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engine consensus.Engine) *BlockValidator { +func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engine consensus.Engine, opts ...BlockValidatorOption) *BlockValidator { validator := &BlockValidator{ config: config, engine: engine, bc: blockchain, } + + for _, opt := range opts { + validator = opt(validator) + } + return validator } @@ -92,6 +107,12 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { } return nil }, + func() error { + if v.remoteValidator != nil && !v.remoteValidator.AncestorVerified(block.Header()) { + return fmt.Errorf("%w, number: %s, hash: %s", ErrAncestorHasNotBeenVerified, block.Number(), block.Hash()) + } + return nil + }, } validateRes := make(chan error, len(validateFuns)) for _, f := range validateFuns { @@ -171,6 +192,10 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD return err } +func (v *BlockValidator) RemoteVerifyManager() *remoteVerifyManager { + return v.remoteValidator +} + // CalcGasLimit computes the gas limit of the next block after parent. It aims // to keep the baseline gas close to the provided target, and increase it towards // the target if the baseline gas is lower. diff --git a/core/blockchain.go b/core/blockchain.go index 811391ee10..680bc6ce41 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -28,6 +28,7 @@ import ( "time" lru "github.com/hashicorp/golang-lru" + "golang.org/x/crypto/sha3" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" @@ -143,6 +144,7 @@ type CacheConfig struct { SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory Preimages bool // Whether to store preimage of trie key to the disk TriesInMemory uint64 // How many tries keeps in memory + NoTries bool // Insecure settings. Do not have any tries in databases if enabled. SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it } @@ -163,7 +165,7 @@ var defaultCacheConfig = &CacheConfig{ SnapshotWait: true, } -type BlockChainOption func(*BlockChain) *BlockChain +type BlockChainOption func(*BlockChain) (*BlockChain, error) // BlockChain represents the canonical chain given a database with a genesis // block. The Blockchain manages chain imports, reverts, chain reorganisations. @@ -197,15 +199,16 @@ type BlockChain struct { txLookupLimit uint64 triesInMemory uint64 - hc *HeaderChain - rmLogsFeed event.Feed - chainFeed event.Feed - chainSideFeed event.Feed - chainHeadFeed event.Feed - logsFeed event.Feed - blockProcFeed event.Feed - scope event.SubscriptionScope - genesisBlock *types.Block + hc *HeaderChain + rmLogsFeed event.Feed + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + chainBlockFeed event.Feed + logsFeed event.Feed + blockProcFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block // This mutex synchronizes chain write operations. // Readers don't need to take it, they can just read the database. @@ -227,6 +230,7 @@ type BlockChain struct { // trusted diff layers diffLayerCache *lru.Cache // Cache for the diffLayers diffLayerRLPCache *lru.Cache // Cache for the rlp encoded diffLayers + diffLayerChanCache *lru.Cache // Cache for the difflayer channel diffQueue *prque.Prque // A Priority queue to store recent diff layer diffQueueBuffer chan *types.DiffLayer diffLayerFreezerBlockLimit uint64 @@ -279,6 +283,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par futureBlocks, _ := lru.New(maxFutureBlocks) diffLayerCache, _ := lru.New(diffLayerCacheLimit) diffLayerRLPCache, _ := lru.New(diffLayerRLPCacheLimit) + diffLayerChanCache, _ := lru.New(diffLayerCacheLimit) bc := &BlockChain{ chainConfig: chainConfig, @@ -289,6 +294,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par Cache: cacheConfig.TrieCleanLimit, Journal: cacheConfig.TrieCleanJournal, Preimages: cacheConfig.Preimages, + NoTries: cacheConfig.NoTries, }), triesInMemory: cacheConfig.TriesInMemory, quit: make(chan struct{}), @@ -300,6 +306,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par badBlockCache: badBlockCache, diffLayerCache: diffLayerCache, diffLayerRLPCache: diffLayerRLPCache, + diffLayerChanCache: diffLayerChanCache, txLookupCache: txLookupCache, futureBlocks: futureBlocks, engine: engine, @@ -312,6 +319,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par diffNumToBlockHashes: make(map[uint64]map[common.Hash]struct{}), diffPeersToDiffHashes: make(map[string]map[common.Hash]struct{}), } + bc.prefetcher = NewStatePrefetcher(chainConfig, bc, engine) bc.forker = NewForkChoice(bc, shouldPreserve) bc.validator = NewBlockValidator(chainConfig, bc, engine) @@ -448,11 +456,14 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par log.Warn("Enabling snapshot recovery", "chainhead", head.NumberU64(), "diskbase", *layer) recover = true } - bc.snaps, _ = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, int(bc.cacheConfig.TriesInMemory), head.Root(), !bc.cacheConfig.SnapshotWait, true, recover) + bc.snaps, _ = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, int(bc.cacheConfig.TriesInMemory), head.Root(), !bc.cacheConfig.SnapshotWait, true, recover, bc.stateCache.NoTries()) } // do options before start any routine for _, option := range options { - bc = option(bc) + bc, err = option(bc) + if err != nil { + return nil, err + } } // Start future block processor. bc.wg.Add(1) @@ -513,11 +524,31 @@ func (bc *BlockChain) cacheReceipts(hash common.Hash, receipts types.Receipts) { bc.receiptsCache.Add(hash, receipts) } -func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer) { +func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer, diffLayerCh chan struct{}) { + sort.SliceStable(diffLayer.Codes, func(i, j int) bool { + return diffLayer.Codes[i].Hash.Hex() < diffLayer.Codes[j].Hash.Hex() + }) + sort.SliceStable(diffLayer.Destructs, func(i, j int) bool { + return diffLayer.Destructs[i].Hex() < (diffLayer.Destructs[j].Hex()) + }) + sort.SliceStable(diffLayer.Accounts, func(i, j int) bool { + return diffLayer.Accounts[i].Account.Hex() < diffLayer.Accounts[j].Account.Hex() + }) + sort.SliceStable(diffLayer.Storages, func(i, j int) bool { + return diffLayer.Storages[i].Account.Hex() < diffLayer.Storages[j].Account.Hex() + }) + for index := range diffLayer.Storages { + // Sort keys and vals by key. + sort.Sort(&diffLayer.Storages[index]) + } + if bc.diffLayerCache.Len() >= diffLayerCacheLimit { bc.diffLayerCache.RemoveOldest() } + bc.diffLayerCache.Add(diffLayer.BlockHash, diffLayer) + close(diffLayerCh) + if bc.db.DiffStore() != nil { // push to priority queue before persisting bc.diffQueueBuffer <- diffLayer @@ -1512,7 +1543,14 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. diffLayer.Receipts = receipts diffLayer.BlockHash = block.Hash() diffLayer.Number = block.NumberU64() - bc.cacheDiffLayer(diffLayer) + + diffLayerCh := make(chan struct{}) + if bc.diffLayerChanCache.Len() >= diffLayerCacheLimit { + bc.diffLayerChanCache.RemoveOldest() + } + bc.diffLayerChanCache.Add(diffLayer.BlockHash, diffLayerCh) + + go bc.cacheDiffLayer(diffLayer, diffLayerCh) } wg.Wait() return nil @@ -1832,6 +1870,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) if err != nil { return it.index, err } + if statedb.NoTrie() { + statedb.SetExpectedStateRoot(block.Root()) + } bc.updateHighestVerifiedHeader(block.Header()) // Enable prefetching to pull in trie node paths while processing transactions @@ -1941,6 +1982,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) stats.processed++ stats.usedGas += usedGas + bc.chainBlockFeed.Send(ChainHeadEvent{block}) dirty, _ := bc.stateCache.TrieDB().Size() stats.report(chain, it.index, dirty) } @@ -2631,9 +2673,14 @@ func (bc *BlockChain) HandleDiffLayer(diffLayer *types.DiffLayer, pid string, fu return nil } + if diffLayer.DiffHash.Load() == nil { + return fmt.Errorf("unexpected difflayer which diffHash is nil from peeer %s", pid) + } + diffHash := diffLayer.DiffHash.Load().(common.Hash) + bc.diffMux.Lock() defer bc.diffMux.Unlock() - if blockHash, exist := bc.diffHashToBlockHash[diffLayer.DiffHash]; exist && blockHash == diffLayer.BlockHash { + if blockHash, exist := bc.diffHashToBlockHash[diffHash]; exist && blockHash == diffLayer.BlockHash { return nil } @@ -2647,28 +2694,28 @@ func (bc *BlockChain) HandleDiffLayer(diffLayer *types.DiffLayer, pid string, fu return nil } if _, exist := bc.diffPeersToDiffHashes[pid]; exist { - if _, alreadyHas := bc.diffPeersToDiffHashes[pid][diffLayer.DiffHash]; alreadyHas { + if _, alreadyHas := bc.diffPeersToDiffHashes[pid][diffHash]; alreadyHas { return nil } } else { bc.diffPeersToDiffHashes[pid] = make(map[common.Hash]struct{}) } - bc.diffPeersToDiffHashes[pid][diffLayer.DiffHash] = struct{}{} + bc.diffPeersToDiffHashes[pid][diffHash] = struct{}{} if _, exist := bc.diffNumToBlockHashes[diffLayer.Number]; !exist { bc.diffNumToBlockHashes[diffLayer.Number] = make(map[common.Hash]struct{}) } bc.diffNumToBlockHashes[diffLayer.Number][diffLayer.BlockHash] = struct{}{} - if _, exist := bc.diffHashToPeers[diffLayer.DiffHash]; !exist { - bc.diffHashToPeers[diffLayer.DiffHash] = make(map[string]struct{}) + if _, exist := bc.diffHashToPeers[diffHash]; !exist { + bc.diffHashToPeers[diffHash] = make(map[string]struct{}) } - bc.diffHashToPeers[diffLayer.DiffHash][pid] = struct{}{} + bc.diffHashToPeers[diffHash][pid] = struct{}{} if _, exist := bc.blockHashToDiffLayers[diffLayer.BlockHash]; !exist { bc.blockHashToDiffLayers[diffLayer.BlockHash] = make(map[common.Hash]*types.DiffLayer) } - bc.blockHashToDiffLayers[diffLayer.BlockHash][diffLayer.DiffHash] = diffLayer - bc.diffHashToBlockHash[diffLayer.DiffHash] = diffLayer.BlockHash + bc.blockHashToDiffLayers[diffLayer.BlockHash][diffHash] = diffLayer + bc.diffHashToBlockHash[diffHash] = diffLayer.BlockHash return nil } @@ -2869,19 +2916,138 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i func (bc *BlockChain) TriesInMemory() uint64 { return bc.triesInMemory } // Options -func EnableLightProcessor(bc *BlockChain) *BlockChain { +func EnableLightProcessor(bc *BlockChain) (*BlockChain, error) { bc.processor = NewLightStateProcessor(bc.Config(), bc, bc.engine) - return bc + return bc, nil } -func EnablePipelineCommit(bc *BlockChain) *BlockChain { +func EnablePipelineCommit(bc *BlockChain) (*BlockChain, error) { bc.pipeCommit = true - return bc + return bc, nil } func EnablePersistDiff(limit uint64) BlockChainOption { - return func(chain *BlockChain) *BlockChain { + return func(chain *BlockChain) (*BlockChain, error) { chain.diffLayerFreezerBlockLimit = limit - return chain + return chain, nil + } +} + +func EnableBlockValidator(chainConfig *params.ChainConfig, engine consensus.Engine, mode VerifyMode, peers verifyPeers) BlockChainOption { + return func(bc *BlockChain) (*BlockChain, error) { + if mode.NeedRemoteVerify() { + vm, err := NewVerifyManager(bc, peers, mode == InsecureVerify) + if err != nil { + return nil, err + } + go vm.mainLoop() + bc.validator = NewBlockValidator(chainConfig, bc, engine, EnableRemoteVerifyManager(vm)) + } + return bc, nil } } + +func (bc *BlockChain) GetVerifyResult(blockNumber uint64, blockHash common.Hash, diffHash common.Hash) *VerifyResult { + var res VerifyResult + res.BlockNumber = blockNumber + res.BlockHash = blockHash + + if blockNumber > bc.CurrentHeader().Number.Uint64()+maxDiffForkDist { + res.Status = types.StatusBlockTooNew + return &res + } else if blockNumber > bc.CurrentHeader().Number.Uint64() { + res.Status = types.StatusBlockNewer + return &res + } + + header := bc.GetHeaderByHash(blockHash) + if header == nil { + if blockNumber > bc.CurrentHeader().Number.Uint64()-maxDiffForkDist { + res.Status = types.StatusPossibleFork + return &res + } + + res.Status = types.StatusImpossibleFork + return &res + } + + diff := bc.GetTrustedDiffLayer(blockHash) + if diff != nil { + if diff.DiffHash.Load() == nil { + hash, err := CalculateDiffHash(diff) + if err != nil { + res.Status = types.StatusUnexpectedError + return &res + } + + diff.DiffHash.Store(hash) + } + + if diffHash != diff.DiffHash.Load().(common.Hash) { + res.Status = types.StatusDiffHashMismatch + return &res + } + + res.Status = types.StatusFullVerified + res.Root = header.Root + return &res + } + + res.Status = types.StatusPartiallyVerified + res.Root = header.Root + return &res +} + +func (bc *BlockChain) GetTrustedDiffLayer(blockHash common.Hash) *types.DiffLayer { + var diff *types.DiffLayer + if cached, ok := bc.diffLayerCache.Get(blockHash); ok { + diff = cached.(*types.DiffLayer) + return diff + } + + diffStore := bc.db.DiffStore() + if diffStore != nil { + diff = rawdb.ReadDiffLayer(diffStore, blockHash) + } + return diff +} + +func CalculateDiffHash(d *types.DiffLayer) (common.Hash, error) { + if d == nil { + return common.Hash{}, fmt.Errorf("nil diff layer") + } + + diff := &types.ExtDiffLayer{ + BlockHash: d.BlockHash, + Receipts: make([]*types.ReceiptForStorage, 0), + Number: d.Number, + Codes: d.Codes, + Destructs: d.Destructs, + Accounts: d.Accounts, + Storages: d.Storages, + } + + for index, account := range diff.Accounts { + full, err := snapshot.FullAccount(account.Blob) + if err != nil { + return common.Hash{}, fmt.Errorf("decode full account error: %v", err) + } + // set account root to empty root + diff.Accounts[index].Blob = snapshot.SlimAccountRLP(full.Nonce, full.Balance, common.Hash{}, full.CodeHash) + } + + rawData, err := rlp.EncodeToBytes(diff) + if err != nil { + return common.Hash{}, fmt.Errorf("encode new diff error: %v", err) + } + + hasher := sha3.NewLegacyKeccak256() + _, err = hasher.Write(rawData) + if err != nil { + return common.Hash{}, fmt.Errorf("hasher write error: %v", err) + } + + var hash common.Hash + hasher.Sum(hash[:0]) + return hash, nil +} diff --git a/core/blockchain_diff_test.go b/core/blockchain_diff_test.go index d126ad829e..cae4b917b6 100644 --- a/core/blockchain_diff_test.go +++ b/core/blockchain_diff_test.go @@ -286,7 +286,7 @@ func rawDataToDiffLayer(data rlp.RawValue) (*types.DiffLayer, error) { hasher.Write(data) var diffHash common.Hash hasher.Sum(diffHash[:0]) - diff.DiffHash = diffHash + diff.DiffHash.Store(diffHash) hasher.Reset() return &diff, nil } @@ -484,3 +484,164 @@ func TestGetDiffAccounts(t *testing.T) { } } } + +// newTwoForkedBlockchains returns two blockchains, these two chains are generated by different +// generators, they have some same parent blocks, the number of same blocks are determined by +// testBlocks, once chain1 inserted a non-default block, chain1 and chain2 get forked. +func newTwoForkedBlockchains(len1, len2 int) (chain1 *BlockChain, chain2 *BlockChain) { + signer := types.HomesteadSigner{} + // Create a database pre-initialize with a genesis block + db1 := rawdb.NewMemoryDatabase() + db1.SetDiffStore(memorydb.New()) + (&Genesis{ + Config: params.TestChainConfig, + Alloc: GenesisAlloc{testAddr: {Balance: big.NewInt(100000000000000000)}}, + }).MustCommit(db1) + engine1 := ethash.NewFaker() + chain1, _ = NewBlockChain(db1, nil, params.TestChainConfig, engine1, vm.Config{}, nil, nil, EnablePersistDiff(860000), EnableBlockValidator(params.TestChainConfig, engine1, 0, nil)) + generator1 := func(i int, block *BlockGen) { + // The chain maker doesn't have access to a chain, so the difficulty will be + // lets unset (nil). Set it here to the correct value. + block.SetCoinbase(testAddr) + + for idx, testBlock := range testBlocks { + // Specific block setting, the index in this generator has 1 diff from specified blockNr. + if i+1 == testBlock.blockNr { + for _, testTransaction := range testBlock.txs { + var transaction *types.Transaction + if testTransaction.to == nil { + transaction = types.NewContractCreation(block.TxNonce(testAddr), + testTransaction.value, uint64(commonGas), testTransaction.gasPrice, testTransaction.data) + } else { + transaction = types.NewTransaction(block.TxNonce(testAddr), *testTransaction.to, + testTransaction.value, uint64(commonGas), testTransaction.gasPrice, testTransaction.data) + } + tx, err := types.SignTx(transaction, signer, testKey) + if err != nil { + panic(err) + } + block.AddTxWithChain(chain1, tx) + } + break + } + + // Default block setting. + if idx == len(testBlocks)-1 { + // We want to simulate an empty middle block, having the same state as the + // first one. The last is needs a state change again to force a reorg. + for _, testTransaction := range testBlocks[0].txs { + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddr), *testTransaction.to, + testTransaction.value, uint64(commonGas), testTransaction.gasPrice, testTransaction.data), signer, testKey) + if err != nil { + panic(err) + } + block.AddTxWithChain(chain1, tx) + } + } + } + + } + bs1, _ := GenerateChain(params.TestChainConfig, chain1.Genesis(), ethash.NewFaker(), db1, len1, generator1) + if _, err := chain1.InsertChain(bs1); err != nil { + panic(err) + } + + // Create a database pre-initialize with a genesis block + db2 := rawdb.NewMemoryDatabase() + db2.SetDiffStore(memorydb.New()) + (&Genesis{ + Config: params.TestChainConfig, + Alloc: GenesisAlloc{testAddr: {Balance: big.NewInt(100000000000000000)}}, + }).MustCommit(db2) + engine2 := ethash.NewFaker() + chain2, _ = NewBlockChain(db2, nil, params.TestChainConfig, ethash.NewFaker(), vm.Config{}, nil, nil, EnablePersistDiff(860000), EnableBlockValidator(params.TestChainConfig, engine2, 0, nil)) + generator2 := func(i int, block *BlockGen) { + // The chain maker doesn't have access to a chain, so the difficulty will be + // lets unset (nil). Set it here to the correct value. + block.SetCoinbase(testAddr) + // We want to simulate an empty middle block, having the same state as the + // first one. The last is needs a state change again to force a reorg. + for _, testTransaction := range testBlocks[0].txs { + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddr), *testTransaction.to, + testTransaction.value, uint64(commonGas), testTransaction.gasPrice, testTransaction.data), signer, testKey) + if err != nil { + panic(err) + } + block.AddTxWithChain(chain1, tx) + } + } + bs2, _ := GenerateChain(params.TestChainConfig, chain2.Genesis(), ethash.NewFaker(), db2, len2, generator2) + if _, err := chain2.InsertChain(bs2); err != nil { + panic(err) + } + + return chain1, chain2 +} + +func testGetRootByDiffHash(t *testing.T, chain1, chain2 *BlockChain, blockNumber uint64, status types.VerifyStatus) { + block2 := chain2.GetBlockByNumber(blockNumber) + if block2 == nil { + t.Fatalf("failed to find block, number: %v", blockNumber) + } + expect := VerifyResult{ + Status: status, + BlockNumber: blockNumber, + BlockHash: block2.Hash(), + } + if status.Code&0xff00 == types.StatusVerified.Code { + expect.Root = block2.Root() + } + + diffLayer2 := chain2.GetTrustedDiffLayer(block2.Hash()) + if diffLayer2 == nil { + t.Fatal("failed to find diff layer") + } + diffHash2 := types.EmptyRootHash + if status != types.StatusDiffHashMismatch { + var err error + diffHash2, err = CalculateDiffHash(diffLayer2) + if err != nil { + t.Fatalf("failed to compute diff hash: %v", err) + } + } + + if status == types.StatusPartiallyVerified { + block1 := chain1.GetBlockByNumber(blockNumber) + if block1 == nil { + t.Fatalf("failed to find block, number: %v", blockNumber) + } + chain1.diffLayerCache.Remove(block1.Hash()) + } + + result := chain1.GetVerifyResult(blockNumber, block2.Hash(), diffHash2) + if result.Status != expect.Status { + t.Fatalf("failed to verify block, number: %v, expect status: %v, real status: %v", blockNumber, expect.Status, result.Status) + } + if result.Root != expect.Root { + t.Fatalf("failed to verify block, number: %v, expect root: %v, real root: %v", blockNumber, expect.Root, result.Root) + } +} + +func TestGetRootByDiffHash(t *testing.T) { + len1 := 23 // length of blockchain1 + len2 := 35 // length of blockchain2 + plen := 11 // length of same parent blocks, which determined by testBlocks. + + chain1, chain2 := newTwoForkedBlockchains(len1, len2) + defer chain1.Stop() + defer chain2.Stop() + + hash1 := chain1.GetBlockByNumber(uint64(plen)).Hash() + hash2 := chain2.GetBlockByNumber(uint64(plen)).Hash() + if hash1 != hash2 { + t.Errorf("chain content mismatch at %d: have hash %v, want hash %v", plen, hash2, hash1) + } + + testGetRootByDiffHash(t, chain1, chain2, 10, types.StatusFullVerified) + testGetRootByDiffHash(t, chain1, chain2, 2, types.StatusPartiallyVerified) + testGetRootByDiffHash(t, chain1, chain2, 10, types.StatusDiffHashMismatch) + testGetRootByDiffHash(t, chain1, chain2, 12, types.StatusImpossibleFork) + testGetRootByDiffHash(t, chain1, chain2, 20, types.StatusPossibleFork) + testGetRootByDiffHash(t, chain1, chain2, 24, types.StatusBlockNewer) + testGetRootByDiffHash(t, chain1, chain2, 35, types.StatusBlockTooNew) +} diff --git a/core/blockchain_notries_test.go b/core/blockchain_notries_test.go new file mode 100644 index 0000000000..7b7a977971 --- /dev/null +++ b/core/blockchain_notries_test.go @@ -0,0 +1,226 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Tests that abnormal program termination (i.e.crash) and restart doesn't leave +// the database in some strange state with gaps in the chain, nor with block data +// dangling in the future. + +package core + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/ethdb/memorydb" + "github.com/ethereum/go-ethereum/params" +) + +func newMockVerifyPeer() *mockVerifyPeer { + return &mockVerifyPeer{} +} + +type requestRoot struct { + blockNumber uint64 + blockHash common.Hash + diffHash common.Hash +} + +type verifFailedStatus struct { + status types.VerifyStatus + blockNumber uint64 +} + +// mockVerifyPeer is a mocking struct that simulates p2p signals for verification tasks. +type mockVerifyPeer struct { + callback func(*requestRoot) +} + +func (peer *mockVerifyPeer) setCallBack(callback func(*requestRoot)) { + peer.callback = callback +} + +func (peer *mockVerifyPeer) RequestRoot(blockNumber uint64, blockHash common.Hash, diffHash common.Hash) error { + if peer.callback != nil { + peer.callback(&requestRoot{blockNumber, blockHash, diffHash}) + } + return nil +} + +func (peer *mockVerifyPeer) ID() string { + return "mock_peer" +} + +type mockVerifyPeers struct { + peers []VerifyPeer +} + +func (peers *mockVerifyPeers) GetVerifyPeers() []VerifyPeer { + return peers.peers +} + +func newMockRemoteVerifyPeer(peers []VerifyPeer) *mockVerifyPeers { + return &mockVerifyPeers{peers} +} + +func makeTestBackendWithRemoteValidator(blocks int, mode VerifyMode, failed *verifFailedStatus) (*testBackend, *testBackend, []*types.Block, error) { + signer := types.HomesteadSigner{} + + // Create a database pre-initialize with a genesis block + db := rawdb.NewMemoryDatabase() + db.SetDiffStore(memorydb.New()) + (&Genesis{ + Config: params.TestChainConfig, + Alloc: GenesisAlloc{testAddr: {Balance: big.NewInt(100000000000000000)}}, + }).MustCommit(db) + engine := ethash.NewFaker() + + db2 := rawdb.NewMemoryDatabase() + db2.SetDiffStore(memorydb.New()) + (&Genesis{ + Config: params.TestChainConfig, + Alloc: GenesisAlloc{testAddr: {Balance: big.NewInt(100000000000000000)}}, + }).MustCommit(db2) + engine2 := ethash.NewFaker() + + peer := newMockVerifyPeer() + peers := []VerifyPeer{peer} + + verifier, err := NewBlockChain(db, nil, params.TestChainConfig, engine, vm.Config{}, + nil, nil, EnablePersistDiff(100000), EnableBlockValidator(params.TestChainConfig, engine2, LocalVerify, nil)) + if err != nil { + return nil, nil, nil, err + } + + fastnode, err := NewBlockChain(db2, nil, params.TestChainConfig, engine2, vm.Config{}, + nil, nil, EnableBlockValidator(params.TestChainConfig, engine2, mode, newMockRemoteVerifyPeer(peers))) + if err != nil { + return nil, nil, nil, err + } + + generator := func(i int, block *BlockGen) { + // The chain maker doesn't have access to a chain, so the difficulty will be + // lets unset (nil). Set it here to the correct value. + block.SetCoinbase(testAddr) + + for idx, testBlock := range testBlocks { + // Specific block setting, the index in this generator has 1 diff from specified blockNr. + if i+1 == testBlock.blockNr { + for _, testTransaction := range testBlock.txs { + var transaction *types.Transaction + if testTransaction.to == nil { + transaction = types.NewContractCreation(block.TxNonce(testAddr), + testTransaction.value, uint64(commonGas), testTransaction.gasPrice, testTransaction.data) + } else { + transaction = types.NewTransaction(block.TxNonce(testAddr), *testTransaction.to, + testTransaction.value, uint64(commonGas), testTransaction.gasPrice, testTransaction.data) + } + tx, err := types.SignTx(transaction, signer, testKey) + if err != nil { + panic(err) + } + block.AddTxWithChain(verifier, tx) + } + break + } + + // Default block setting. + if idx == len(testBlocks)-1 { + // We want to simulate an empty middle block, having the same state as the + // first one. The last is needs a state change again to force a reorg. + for _, testTransaction := range testBlocks[0].txs { + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddr), *testTransaction.to, + testTransaction.value, uint64(commonGas), testTransaction.gasPrice, testTransaction.data), signer, testKey) + if err != nil { + panic(err) + } + block.AddTxWithChain(verifier, tx) + } + } + } + } + + bs, _ := GenerateChain(params.TestChainConfig, verifier.Genesis(), ethash.NewFaker(), db, blocks, generator) + + peer.setCallBack(func(req *requestRoot) { + if fastnode.validator != nil && fastnode.validator.RemoteVerifyManager() != nil { + resp := verifier.GetVerifyResult(req.blockNumber, req.blockHash, req.diffHash) + if failed != nil && req.blockNumber == failed.blockNumber { + resp.Status = failed.status + } + fastnode.validator.RemoteVerifyManager(). + HandleRootResponse( + resp, peer.ID()) + } + }) + if _, err := verifier.InsertChain(bs); err != nil { + return nil, nil, nil, err + } + + return &testBackend{ + db: db, + chain: verifier, + }, + &testBackend{ + db: db2, + chain: fastnode, + }, bs, nil +} + +func TestFastNode(t *testing.T) { + // test full mode and succeed + _, fastnode, blocks, err := makeTestBackendWithRemoteValidator(10240, FullVerify, nil) + if err != nil { + t.Fatalf(err.Error()) + } + _, err = fastnode.chain.InsertChain(blocks) + if err != nil { + t.Fatalf(err.Error()) + } + // test full mode and failed + failed := &verifFailedStatus{status: types.StatusDiffHashMismatch, blockNumber: 2048} + _, fastnode, blocks, err = makeTestBackendWithRemoteValidator(10240, FullVerify, failed) + if err != nil { + t.Fatalf(err.Error()) + } + _, err = fastnode.chain.InsertChain(blocks) + if err == nil || fastnode.chain.CurrentBlock().NumberU64() != failed.blockNumber+10 { + t.Fatalf("blocks insert should be failed at height %d", failed.blockNumber+11) + } + // test insecure mode and succeed + _, fastnode, blocks, err = makeTestBackendWithRemoteValidator(10240, InsecureVerify, nil) + if err != nil { + t.Fatalf(err.Error()) + } + _, err = fastnode.chain.InsertChain(blocks) + if err != nil { + t.Fatalf(err.Error()) + } + // test insecure mode and failed + failed = &verifFailedStatus{status: types.StatusImpossibleFork, blockNumber: 2048} + _, fastnode, blocks, err = makeTestBackendWithRemoteValidator(10240, FullVerify, failed) + if err != nil { + t.Fatalf(err.Error()) + } + _, err = fastnode.chain.InsertChain(blocks) + if err == nil || fastnode.chain.CurrentBlock().NumberU64() != failed.blockNumber+10 { + t.Fatalf("blocks insert should be failed at height %d", failed.blockNumber+11) + } +} diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 5d983d742c..219f0e0804 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -376,6 +376,11 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch)) } +// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent. +func (bc *BlockChain) SubscribeChainBlockEvent(ch chan<- ChainHeadEvent) event.Subscription { + return bc.scope.Track(bc.chainBlockFeed.Subscribe(ch)) +} + // SubscribeChainSideEvent registers a subscription of ChainSideEvent. func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { return bc.scope.Track(bc.chainSideFeed.Subscribe(ch)) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index bc0d57e085..041884bcc1 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -264,7 +264,7 @@ func TestBlockImportVerification(t *testing.T) { } defer processor.Stop() // Start fork from current height - processor = EnablePipelineCommit(processor) + processor, _ = EnablePipelineCommit(processor) testInvalidStateRootBlockImport(t, processor, length, 10, true) } diff --git a/core/error.go b/core/error.go index 0cc61b7a55..c4ba1e9d37 100644 --- a/core/error.go +++ b/core/error.go @@ -37,6 +37,12 @@ var ( // ErrDiffLayerNotFound is returned when diff layer not found. ErrDiffLayerNotFound = errors.New("diff layer not found") + // ErrDiffLayerNotFound is returned when block - 11 has not been verified by the remote verifier. + ErrAncestorHasNotBeenVerified = errors.New("block ancestor has not been verified") + + // ErrCurrentBlockNotFound is returned when current block not found. + ErrCurrentBlockNotFound = errors.New("current block not found") + // ErrKnownBadBlock is return when the block is a known bad block ErrKnownBadBlock = errors.New("already known bad block") ) diff --git a/core/remote_state_verifier.go b/core/remote_state_verifier.go new file mode 100644 index 0000000000..cb8d2366bb --- /dev/null +++ b/core/remote_state_verifier.go @@ -0,0 +1,446 @@ +package core + +import ( + "fmt" + "math/big" + "math/rand" + "sync" + "time" + + lru "github.com/hashicorp/golang-lru" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" +) + +const ( + verifiedCacheSize = 256 + maxForkHeight = 11 + + // defaultPeerNumber is default number of verify peers + defaultPeerNumber = 3 + // pruneHeightDiff indicates that if the height difference between current block and task's + // corresponding block is larger than it, the task should be pruned. + pruneHeightDiff = 15 + pruneInterval = 5 * time.Second + resendInterval = 2 * time.Second + // tryAllPeersTime is the time that a block has not been verified and then try all the valid verify peers. + tryAllPeersTime = 15 * time.Second + // maxWaitVerifyResultTime is the max time of waiting for ancestor's verify result. + maxWaitVerifyResultTime = 30 * time.Second +) + +var ( + verifyTaskCounter = metrics.NewRegisteredCounter("verifymanager/task/total", nil) + verifyTaskSucceedMeter = metrics.NewRegisteredMeter("verifymanager/task/result/succeed", nil) + verifyTaskFailedMeter = metrics.NewRegisteredMeter("verifymanager/task/result/failed", nil) + + verifyTaskExecutionTimer = metrics.NewRegisteredTimer("verifymanager/task/execution", nil) +) + +type remoteVerifyManager struct { + bc *BlockChain + taskLock sync.RWMutex + tasks map[common.Hash]*verifyTask + peers verifyPeers + verifiedCache *lru.Cache + allowInsecure bool + + // Subscription + chainBlockCh chan ChainHeadEvent + chainHeadSub event.Subscription + + // Channels + verifyCh chan common.Hash + messageCh chan verifyMessage +} + +func NewVerifyManager(blockchain *BlockChain, peers verifyPeers, allowInsecure bool) (*remoteVerifyManager, error) { + verifiedCache, _ := lru.New(verifiedCacheSize) + block := blockchain.CurrentBlock() + if block == nil { + return nil, ErrCurrentBlockNotFound + } + + // rewind to last non verified block + number := new(big.Int).Sub(block.Number(), big.NewInt(int64(maxForkHeight))) + if number.Cmp(common.Big0) < 0 { + blockchain.SetHead(0) + } else { + numberU64 := number.Uint64() + blockchain.SetHead(numberU64) + block := blockchain.GetBlockByNumber(numberU64) + for i := 0; i < maxForkHeight && block.NumberU64() > 0; i++ { + // When inserting a block, + // the block before 11 blocks will be verified, + // so the parent block of 11-22 will directly write the verification information. + verifiedCache.Add(block.Hash(), true) + block = blockchain.GetBlockByHash(block.ParentHash()) + if block == nil { + return nil, fmt.Errorf("block is nil, number: %d", number) + } + } + } + + vm := &remoteVerifyManager{ + bc: blockchain, + tasks: make(map[common.Hash]*verifyTask), + peers: peers, + verifiedCache: verifiedCache, + allowInsecure: allowInsecure, + + chainBlockCh: make(chan ChainHeadEvent, chainHeadChanSize), + verifyCh: make(chan common.Hash, maxForkHeight), + messageCh: make(chan verifyMessage), + } + vm.chainHeadSub = blockchain.SubscribeChainBlockEvent(vm.chainBlockCh) + return vm, nil +} + +func (vm *remoteVerifyManager) mainLoop() { + defer vm.chainHeadSub.Unsubscribe() + + pruneTicker := time.NewTicker(pruneInterval) + defer pruneTicker.Stop() + for { + select { + case h := <-vm.chainBlockCh: + vm.NewBlockVerifyTask(h.Block.Header()) + case hash := <-vm.verifyCh: + vm.cacheBlockVerified(hash) + vm.taskLock.Lock() + if task, ok := vm.tasks[hash]; ok { + vm.CloseTask(task) + verifyTaskSucceedMeter.Mark(1) + verifyTaskExecutionTimer.Update(time.Since(task.startAt)) + } + vm.taskLock.Unlock() + case <-pruneTicker.C: + vm.taskLock.Lock() + for _, task := range vm.tasks { + if vm.bc.insertStopped() || (vm.bc.CurrentHeader().Number.Cmp(task.blockHeader.Number) == 1 && + vm.bc.CurrentHeader().Number.Uint64()-task.blockHeader.Number.Uint64() > pruneHeightDiff) { + vm.CloseTask(task) + verifyTaskFailedMeter.Mark(1) + } + } + vm.taskLock.Unlock() + case message := <-vm.messageCh: + vm.taskLock.RLock() + if vt, ok := vm.tasks[message.verifyResult.BlockHash]; ok { + vt.messageCh <- message + } + vm.taskLock.RUnlock() + // System stopped + case <-vm.bc.quit: + vm.taskLock.RLock() + for _, task := range vm.tasks { + task.Close() + } + vm.taskLock.RUnlock() + return + case <-vm.chainHeadSub.Err(): + return + } + } +} + +func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) { + for i := 0; header != nil && i <= maxForkHeight; i++ { + // if is genesis block, mark it as verified and break. + if header.Number.Uint64() == 0 { + vm.cacheBlockVerified(header.Hash()) + break + } + func(hash common.Hash) { + // if verified cache record that this block has been verified, skip. + if _, ok := vm.verifiedCache.Get(hash); ok { + return + } + // if there already has a verify task for this block, skip. + vm.taskLock.RLock() + _, ok := vm.tasks[hash] + vm.taskLock.RUnlock() + if ok { + return + } + + if header.TxHash == types.EmptyRootHash { + log.Debug("this is an empty block:", "block", hash, "number", header.Number) + vm.cacheBlockVerified(hash) + return + } + + var diffLayer *types.DiffLayer + if cached, ok := vm.bc.diffLayerChanCache.Get(hash); ok { + diffLayerCh := cached.(chan struct{}) + <-diffLayerCh + diffLayer = vm.bc.GetTrustedDiffLayer(hash) + } + // if this block has no diff, there is no need to verify it. + if diffLayer == nil { + log.Info("block's trusted diffLayer is nil", "hash", hash, "number", header.Number) + return + } + diffHash, err := CalculateDiffHash(diffLayer) + if err != nil { + log.Error("failed to get diff hash", "block", hash, "number", header.Number, "error", err) + return + } + verifyTask := NewVerifyTask(diffHash, header, vm.peers, vm.verifyCh, vm.allowInsecure) + vm.taskLock.Lock() + vm.tasks[hash] = verifyTask + vm.taskLock.Unlock() + verifyTaskCounter.Inc(1) + }(header.Hash()) + header = vm.bc.GetHeaderByHash(header.ParentHash) + } +} + +func (vm *remoteVerifyManager) cacheBlockVerified(hash common.Hash) { + if vm.verifiedCache.Len() >= verifiedCacheSize { + vm.verifiedCache.RemoveOldest() + } + vm.verifiedCache.Add(hash, true) +} + +// AncestorVerified function check block has been verified or it's a empty block. +func (vm *remoteVerifyManager) AncestorVerified(header *types.Header) bool { + // find header of H-11 block. + header = vm.bc.GetHeaderByNumber(header.Number.Uint64() - maxForkHeight) + // If start from genesis block, there has not a H-11 block. + if header == nil { + return true + } + + hash := header.Hash() + + // Check if the task is complete + vm.taskLock.RLock() + task, exist := vm.tasks[hash] + vm.taskLock.RUnlock() + timeout := time.NewTimer(maxWaitVerifyResultTime) + defer timeout.Stop() + if exist { + select { + case <-task.terminalCh: + case <-timeout.C: + return false + } + } + + _, exist = vm.verifiedCache.Get(hash) + return exist +} + +func (vm *remoteVerifyManager) HandleRootResponse(vr *VerifyResult, pid string) error { + vm.messageCh <- verifyMessage{verifyResult: vr, peerId: pid} + return nil +} + +func (vm *remoteVerifyManager) CloseTask(task *verifyTask) { + delete(vm.tasks, task.blockHeader.Hash()) + task.Close() + verifyTaskCounter.Dec(1) +} + +type VerifyResult struct { + Status types.VerifyStatus + BlockNumber uint64 + BlockHash common.Hash + Root common.Hash +} + +type verifyMessage struct { + verifyResult *VerifyResult + peerId string +} + +type verifyTask struct { + diffhash common.Hash + blockHeader *types.Header + candidatePeers verifyPeers + badPeers map[string]struct{} + startAt time.Time + allowInsecure bool + + messageCh chan verifyMessage + terminalCh chan struct{} +} + +func NewVerifyTask(diffhash common.Hash, header *types.Header, peers verifyPeers, verifyCh chan common.Hash, allowInsecure bool) *verifyTask { + vt := &verifyTask{ + diffhash: diffhash, + blockHeader: header, + candidatePeers: peers, + badPeers: make(map[string]struct{}), + allowInsecure: allowInsecure, + messageCh: make(chan verifyMessage), + terminalCh: make(chan struct{}), + } + go vt.Start(verifyCh) + return vt +} + +func (vt *verifyTask) Close() { + // It is safe to call close multiple + select { + case <-vt.terminalCh: + default: + close(vt.terminalCh) + } +} + +func (vt *verifyTask) Start(verifyCh chan common.Hash) { + vt.startAt = time.Now() + + vt.sendVerifyRequest(defaultPeerNumber) + resend := time.NewTicker(resendInterval) + defer resend.Stop() + for { + select { + case msg := <-vt.messageCh: + switch msg.verifyResult.Status { + case types.StatusFullVerified: + vt.compareRootHashAndMark(msg, verifyCh) + case types.StatusPartiallyVerified: + log.Warn("block is insecure verified", "hash", msg.verifyResult.BlockHash, "number", msg.verifyResult.BlockNumber) + if vt.allowInsecure { + vt.compareRootHashAndMark(msg, verifyCh) + } + case types.StatusDiffHashMismatch, types.StatusImpossibleFork, types.StatusUnexpectedError: + vt.badPeers[msg.peerId] = struct{}{} + log.Info("peer is not available", "hash", msg.verifyResult.BlockHash, "number", msg.verifyResult.BlockNumber, "peer", msg.peerId, "reason", msg.verifyResult.Status.Msg) + case types.StatusBlockTooNew, types.StatusBlockNewer, types.StatusPossibleFork: + log.Info("return msg from peer", "peerId", msg.peerId, "hash", msg.verifyResult.BlockHash, "msg", msg.verifyResult.Status.Msg) + } + newVerifyMsgTypeGauge(msg.verifyResult.Status.Code, msg.peerId).Inc(1) + case <-resend.C: + // if a task has run over 15s, try all the vaild peers to verify. + if time.Since(vt.startAt) < tryAllPeersTime { + vt.sendVerifyRequest(1) + } else { + vt.sendVerifyRequest(-1) + } + case <-vt.terminalCh: + return + } + } +} + +// sendVerifyRequest func select at most n peers from (candidatePeers-badPeers) randomly and send verify request. +// when n<0, send to all the peers exclude badPeers. +func (vt *verifyTask) sendVerifyRequest(n int) { + var validPeers []VerifyPeer + candidatePeers := vt.candidatePeers.GetVerifyPeers() + for _, p := range candidatePeers { + if _, ok := vt.badPeers[p.ID()]; !ok { + validPeers = append(validPeers, p) + } + } + // if has not valid peer, log warning. + if len(validPeers) == 0 { + log.Warn("there is no valid peer for block", "number", vt.blockHeader.Number) + return + } + + if n < len(validPeers) && n > 0 { + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(validPeers), func(i, j int) { validPeers[i], validPeers[j] = validPeers[j], validPeers[i] }) + } else { + n = len(validPeers) + } + for i := 0; i < n; i++ { + p := validPeers[i] + p.RequestRoot(vt.blockHeader.Number.Uint64(), vt.blockHeader.Hash(), vt.diffhash) + } +} + +func (vt *verifyTask) compareRootHashAndMark(msg verifyMessage, verifyCh chan common.Hash) { + if msg.verifyResult.Root == vt.blockHeader.Root { + // write back to manager so that manager can cache the result and delete this task. + verifyCh <- msg.verifyResult.BlockHash + } else { + vt.badPeers[msg.peerId] = struct{}{} + } +} + +type VerifyPeer interface { + RequestRoot(blockNumber uint64, blockHash common.Hash, diffHash common.Hash) error + ID() string +} + +type verifyPeers interface { + GetVerifyPeers() []VerifyPeer +} + +type VerifyMode uint32 + +const ( + LocalVerify VerifyMode = iota + FullVerify + InsecureVerify + NoneVerify +) + +func (mode VerifyMode) IsValid() bool { + return mode >= LocalVerify && mode <= NoneVerify +} + +func (mode VerifyMode) String() string { + switch mode { + case LocalVerify: + return "local" + case FullVerify: + return "full" + case InsecureVerify: + return "insecure" + case NoneVerify: + return "none" + default: + return "unknown" + } +} + +func (mode VerifyMode) MarshalText() ([]byte, error) { + switch mode { + case LocalVerify: + return []byte("local"), nil + case FullVerify: + return []byte("full"), nil + case InsecureVerify: + return []byte("insecure"), nil + case NoneVerify: + return []byte("none"), nil + default: + return nil, fmt.Errorf("unknown verify mode %d", mode) + } +} + +func (mode *VerifyMode) UnmarshalText(text []byte) error { + switch string(text) { + case "local": + *mode = LocalVerify + case "full": + *mode = FullVerify + case "insecure": + *mode = InsecureVerify + case "none": + *mode = NoneVerify + default: + return fmt.Errorf(`unknown sync mode %q, want "full", "light" or "insecure"`, text) + } + return nil +} + +func (mode VerifyMode) NeedRemoteVerify() bool { + return mode == FullVerify || mode == InsecureVerify +} + +func newVerifyMsgTypeGauge(msgType uint16, peerId string) metrics.Gauge { + m := fmt.Sprintf("verifymanager/message/%d/peer/%s", msgType, peerId) + return metrics.GetOrRegisterGauge(m, nil) +} diff --git a/core/state/database.go b/core/state/database.go index 350f513948..0f31bc9139 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -75,6 +75,9 @@ type Database interface { // Purge cache Purge() + + // NoTries returns whether the database has tries storage. + NoTries() bool } // Trie is a Ethereum Merkle Patricia trie. @@ -138,10 +141,12 @@ func NewDatabase(db ethdb.Database) Database { func NewDatabaseWithConfig(db ethdb.Database, config *trie.Config) Database { csc, _ := lru.New(codeSizeCacheSize) cc, _ := lru.New(codeCacheSize) + noTries := config != nil && config.NoTries return &cachingDB{ db: trie.NewDatabaseWithConfig(db, config), codeSizeCache: csc, codeCache: cc, + noTries: noTries, } } @@ -150,6 +155,7 @@ func NewDatabaseWithConfigAndCache(db ethdb.Database, config *trie.Config) Datab cc, _ := lru.New(codeCacheSize) atc, _ := lru.New(accountTrieCacheSize) stc, _ := lru.New(storageTrieCacheSize) + noTries := config != nil && config.NoTries database := &cachingDB{ db: trie.NewDatabaseWithConfig(db, config), @@ -157,8 +163,11 @@ func NewDatabaseWithConfigAndCache(db ethdb.Database, config *trie.Config) Datab codeCache: cc, accountTrieCache: atc, storageTrieCache: stc, + noTries: noTries, + } + if !noTries { + go database.purgeLoop() } - go database.purgeLoop() return database } @@ -168,6 +177,7 @@ type cachingDB struct { codeCache *lru.Cache accountTrieCache *lru.Cache storageTrieCache *lru.Cache + noTries bool } type triePair struct { @@ -191,6 +201,9 @@ func (db *cachingDB) purgeLoop() { // OpenTrie opens the main account trie at a specific root hash. func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) { + if db.noTries { + return trie.NewEmptyTrie(), nil + } if db.accountTrieCache != nil { if tr, exist := db.accountTrieCache.Get(root); exist { return tr.(Trie).(*trie.SecureTrie).Copy(), nil @@ -205,6 +218,9 @@ func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) { // OpenStorageTrie opens the storage trie of an account. func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) { + if db.noTries { + return trie.NewEmptyTrie(), nil + } if db.storageTrieCache != nil { if tries, exist := db.storageTrieCache.Get(addrHash); exist { triesPairs := tries.([3]*triePair) @@ -250,6 +266,10 @@ func (db *cachingDB) CacheStorage(addrHash common.Hash, root common.Hash, t Trie } } +func (db *cachingDB) NoTries() bool { + return db.noTries +} + func (db *cachingDB) Purge() { if db.storageTrieCache != nil { db.storageTrieCache.Purge() @@ -267,6 +287,8 @@ func (db *cachingDB) CopyTrie(t Trie) Trie { switch t := t.(type) { case *trie.SecureTrie: return t.Copy() + case *trie.EmptyTrie: + return t.Copy() default: panic(fmt.Errorf("unknown trie type %T", t)) } diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index 459f8ecc1d..8099ee5cb7 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -31,7 +31,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -102,7 +104,7 @@ func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize, trie if headBlock == nil { return nil, errors.New("Failed to load head block") } - snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, int(triesInMemory), headBlock.Root(), false, false, false) + snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, int(triesInMemory), headBlock.Root(), false, false, false, false) if err != nil { return nil, err // The relevant snapshot(s) might not exist } @@ -137,6 +139,105 @@ func NewBlockPruner(db ethdb.Database, n *node.Node, oldAncientPath, newAncientP } } +func NewAllPruner(db ethdb.Database) (*Pruner, error) { + headBlock := rawdb.ReadHeadBlock(db) + if headBlock == nil { + return nil, errors.New("Failed to load head block") + } + return &Pruner{ + db: db, + }, nil +} + +func (p *Pruner) PruneAll(genesis *core.Genesis) error { + deleteCleanTrieCache(p.trieCachePath) + return pruneAll(p.db, genesis) +} + +func pruneAll(maindb ethdb.Database, g *core.Genesis) error { + var ( + count int + size common.StorageSize + pstart = time.Now() + logged = time.Now() + batch = maindb.NewBatch() + iter = maindb.NewIterator(nil, nil) + ) + start := time.Now() + for iter.Next() { + key := iter.Key() + if len(key) == common.HashLength { + count += 1 + size += common.StorageSize(len(key) + len(iter.Value())) + batch.Delete(key) + + var eta time.Duration // Realistically will never remain uninited + if done := binary.BigEndian.Uint64(key[:8]); done > 0 { + var ( + left = math.MaxUint64 - binary.BigEndian.Uint64(key[:8]) + speed = done/uint64(time.Since(pstart)/time.Millisecond+1) + 1 // +1s to avoid division by zero + ) + eta = time.Duration(left/speed) * time.Millisecond + } + if time.Since(logged) > 8*time.Second { + log.Info("Pruning state data", "nodes", count, "size", size, + "elapsed", common.PrettyDuration(time.Since(pstart)), "eta", common.PrettyDuration(eta)) + logged = time.Now() + } + // Recreate the iterator after every batch commit in order + // to allow the underlying compactor to delete the entries. + if batch.ValueSize() >= ethdb.IdealBatchSize { + batch.Write() + batch.Reset() + + iter.Release() + iter = maindb.NewIterator(nil, key) + } + } + } + if batch.ValueSize() > 0 { + batch.Write() + batch.Reset() + } + iter.Release() + log.Info("Pruned state data", "nodes", count, "size", size, "elapsed", common.PrettyDuration(time.Since(pstart))) + + // Start compactions, will remove the deleted data from the disk immediately. + // Note for small pruning, the compaction is skipped. + if count >= rangeCompactionThreshold { + cstart := time.Now() + for b := 0x00; b <= 0xf0; b += 0x10 { + var ( + start = []byte{byte(b)} + end = []byte{byte(b + 0x10)} + ) + if b == 0xf0 { + end = nil + } + log.Info("Compacting database", "range", fmt.Sprintf("%#x-%#x", start, end), "elapsed", common.PrettyDuration(time.Since(cstart))) + if err := maindb.Compact(start, end); err != nil { + log.Error("Database compaction failed", "error", err) + return err + } + } + log.Info("Database compaction finished", "elapsed", common.PrettyDuration(time.Since(cstart))) + } + statedb, _ := state.New(common.Hash{}, state.NewDatabase(maindb), nil) + for addr, account := range g.Alloc { + statedb.AddBalance(addr, account.Balance) + statedb.SetCode(addr, account.Code) + statedb.SetNonce(addr, account.Nonce) + for key, value := range account.Storage { + statedb.SetState(addr, key, value) + } + } + root := statedb.IntermediateRoot(false) + statedb.Commit(nil) + statedb.Database().TrieDB().Commit(root, true, nil) + log.Info("State pruning successful", "pruned", size, "elapsed", common.PrettyDuration(time.Since(start))) + return nil +} + func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, stateBloom *stateBloom, bloomPath string, middleStateRoots map[common.Hash]struct{}, start time.Time) error { // Delete all stale trie nodes in the disk. With the help of state bloom // the trie nodes(and codes) belong to the active state will be filtered @@ -584,7 +685,7 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string, tri // - The state HEAD is rewound already because of multiple incomplete `prune-state` // In this case, even the state HEAD is not exactly matched with snapshot, it // still feasible to recover the pruning correctly. - snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, int(triesInMemory), headBlock.Root(), false, false, true) + snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, int(triesInMemory), headBlock.Root(), false, false, true, false) if err != nil { return err // The relevant snapshot(s) might not exist } diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go index 99b152b29b..883d947c90 100644 --- a/core/state/snapshot/journal.go +++ b/core/state/snapshot/journal.go @@ -149,7 +149,7 @@ func loadAndParseJournal(db ethdb.KeyValueStore, base *diskLayer) (snapshot, jou } // loadSnapshot loads a pre-existing state snapshot backed by a key-value store. -func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, recovery bool) (snapshot, bool, error) { +func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, recovery, withoutTrie bool) (snapshot, bool, error) { // If snapshotting is disabled (initial sync in progress), don't do anything, // wait for the chain to permit us to do something meaningful if rawdb.ReadSnapshotDisabled(diskdb) { @@ -168,6 +168,7 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root: baseRoot, } snapshot, generator, err := loadAndParseJournal(diskdb, base) + if err != nil { log.Warn("Failed to load new-format journal", "error", err) return nil, false, err @@ -181,6 +182,11 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, // which is below the snapshot. In this case the snapshot can be recovered // by re-executing blocks but right now it's unavailable. if head := snapshot.Root(); head != root { + log.Warn("Snapshot is not continuous with chain", "snaproot", head, "chainroot", root) + + if withoutTrie { + return snapshot, false, nil + } // If it's legacy snapshot, or it's new-format snapshot but // it's not in recovery mode, returns the error here for // rebuilding the entire snapshot forcibly. @@ -191,7 +197,6 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, // the disk layer is always higher than chain head. It can // be eventually recovered when the chain head beyonds the // disk layer. - log.Warn("Snapshot is not continuous with chain", "snaproot", head, "chainroot", root) } // Everything loaded correctly, resume any suspended operations if !generator.Done { diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index b1ffac82ba..8cd10408cc 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -203,7 +203,7 @@ type Tree struct { // This case happens when the snapshot is 'ahead' of the state trie. // - otherwise, the entire snapshot is considered invalid and will be recreated on // a background thread. -func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache, cap int, root common.Hash, async bool, rebuild bool, recovery bool) (*Tree, error) { +func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache, cap int, root common.Hash, async bool, rebuild bool, recovery, withoutTrie bool) (*Tree, error) { // Create a new, empty snapshot tree snap := &Tree{ diskdb: diskdb, @@ -216,7 +216,7 @@ func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache, cap int, root defer snap.waitBuild() } // Attempt to load a previously persisted snapshot and rebuild one if failed - head, disabled, err := loadSnapshot(diskdb, triedb, cache, root, recovery) + head, disabled, err := loadSnapshot(diskdb, triedb, cache, root, recovery, withoutTrie) if disabled { log.Warn("Snapshot maintenance disabled (syncing)") return snap, nil diff --git a/core/state/state_object.go b/core/state/state_object.go index dc57fc67ba..43289fa5bd 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -18,6 +18,7 @@ package state import ( "bytes" + "errors" "fmt" "io" "math/big" @@ -25,12 +26,15 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" ) +const snapshotStaleRetryInterval = time.Millisecond * 10 + var emptyCodeHash = crypto.Keccak256(nil) type Code []byte @@ -267,7 +271,19 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has } enc, err = s.db.snap.Storage(s.addrHash, crypto.Keccak256Hash(key.Bytes())) } - // If the snapshot is unavailable or reading from it fails, load from the database. + // ErrSnapshotStale may occur due to diff layers in the update, so we should try again in noTrie mode. + if s.db.NoTrie() && err != nil && errors.Is(err, snapshot.ErrSnapshotStale) { + // This error occurs when statedb.snaps.Cap changes the state of the merged difflayer + // to stale during the refresh of the difflayer, indicating that it is about to be discarded. + // Since the difflayer is refreshed in parallel, + // there is a small chance that the difflayer of the stale will be read while reading, + // resulting in an empty array being returned here. + // Therefore, noTrie mode must retry here, + // and add a time interval when retrying to avoid stacking too much and causing stack overflow. + time.Sleep(snapshotStaleRetryInterval) + return s.GetCommittedState(db, key) + } + // If snapshot unavailable or reading from it failed, load from the database if s.db.snap == nil || err != nil { if meter != nil { // If we already spent time checking the snapshot, account for it @@ -431,6 +447,13 @@ func (s *StateObject) updateTrie(db Database) Trie { // UpdateRoot sets the trie root to the current root hash of func (s *StateObject) updateRoot(db Database) { + // If node runs in no trie mode, set root to empty. + defer func() { + if db.NoTries() { + s.data.Root = common.Hash{} + } + }() + // If nothing changed, don't bother with hashing anything if s.updateTrie(db) == nil { return diff --git a/core/state/statedb.go b/core/state/statedb.go index ba7a238e72..68606b4d09 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -82,6 +82,7 @@ type StateDB struct { stateRoot common.Hash // The calculation result of IntermediateRoot trie Trie + noTrie bool hasher crypto.KeccakState diffLayer *types.DiffLayer diffTries map[common.Address]Trie @@ -178,6 +179,7 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, journal: newJournal(), hasher: crypto.NewKeccakState(), } + if sdb.snaps != nil { if sdb.snap = sdb.snaps.Snapshot(root); sdb.snap != nil { sdb.snapDestructs = make(map[common.Address]struct{}) @@ -192,6 +194,7 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, if err != nil && (sdb.snap == nil || snapVerified) { return nil, err } + _, sdb.noTrie = tr.(*trie.EmptyTrie) sdb.trie = tr return sdb, nil } @@ -206,6 +209,9 @@ func (s *StateDB) EnableWriteOnSharedStorage() { func (s *StateDB) StartPrefetcher(namespace string) { s.prefetcherLock.Lock() defer s.prefetcherLock.Unlock() + if s.noTrie { + return + } if s.prefetcher != nil { s.prefetcher.close() s.prefetcher = nil @@ -220,6 +226,9 @@ func (s *StateDB) StartPrefetcher(namespace string) { func (s *StateDB) StopPrefetcher() { s.prefetcherLock.Lock() defer s.prefetcherLock.Unlock() + if s.noTrie { + return + } if s.prefetcher != nil { s.prefetcher.close() s.prefetcher = nil @@ -264,6 +273,10 @@ func (s *StateDB) setError(err error) { } } +func (s *StateDB) NoTrie() bool { + return s.noTrie +} + func (s *StateDB) Error() error { return s.dbErr } @@ -563,6 +576,9 @@ func (s *StateDB) Suicide(addr common.Address) bool { // updateStateObject writes the given object to the trie. func (s *StateDB) updateStateObject(obj *StateObject) { + if s.noTrie { + return + } // Track the amount of time wasted on updating the account from the trie if metrics.EnabledExpensive { defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now()) @@ -576,6 +592,9 @@ func (s *StateDB) updateStateObject(obj *StateObject) { // deleteStateObject removes the given object from the state trie. func (s *StateDB) deleteStateObject(obj *StateObject) { + if s.noTrie { + return + } // Track the amount of time wasted on deleting the account from the trie if metrics.EnabledExpensive { defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now()) @@ -634,6 +653,20 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *StateObject { } } } + + // ErrSnapshotStale may occur due to diff layers in the update, so we should try again in noTrie mode. + if s.NoTrie() && err != nil && errors.Is(err, snapshot.ErrSnapshotStale) { + // This error occurs when statedb.snaps.Cap changes the state of the merged difflayer + // to stale during the refresh of the difflayer, indicating that it is about to be discarded. + // Since the difflayer is refreshed in parallel, + // there is a small chance that the difflayer of the stale will be read while reading, + // resulting in an empty array being returned here. + // Therefore, noTrie mode must retry here, + // and add a time interval when retrying to avoid stacking too much and causing OOM. + time.Sleep(snapshotStaleRetryInterval) + return s.getDeletedStateObject(addr) + } + // If snapshot unavailable or reading from it failed, load from the database if s.snap == nil || err != nil { if s.trie == nil { @@ -947,6 +980,7 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { // It is called in between transactions to get the root hash that // goes into transaction receipts. func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { + // light process is not allowed when there is no trie if s.lightProcessed { s.StopPrefetcher() return s.trie.Hash() @@ -1114,18 +1148,18 @@ func (s *StateDB) StateIntermediateRoot() common.Hash { } usedAddrs := make([][]byte, 0, len(s.stateObjectsPending)) - for addr := range s.stateObjectsPending { - if obj := s.stateObjects[addr]; obj.deleted { - s.deleteStateObject(obj) - s.AccountDeleted += 1 - } else { - s.updateStateObject(obj) - s.AccountUpdated += 1 + if !s.noTrie { + for addr := range s.stateObjectsPending { + if obj := s.stateObjects[addr]; obj.deleted { + s.deleteStateObject(obj) + } else { + s.updateStateObject(obj) + } + usedAddrs = append(usedAddrs, common.CopyBytes(addr[:])) // Copy needed for closure + } + if prefetcher != nil { + prefetcher.used(s.originalRoot, usedAddrs) } - usedAddrs = append(usedAddrs, common.CopyBytes(addr[:])) // Copy needed for closure - } - if prefetcher != nil { - prefetcher.used(s.originalRoot, usedAddrs) } if len(s.stateObjectsPending) > 0 { @@ -1135,8 +1169,11 @@ func (s *StateDB) StateIntermediateRoot() common.Hash { if metrics.EnabledExpensive { defer func(start time.Time) { s.AccountHashes += time.Since(start) }(time.Now()) } - root := s.trie.Hash() - return root + if s.noTrie { + return s.expectedRoot + } else { + return s.trie.Hash() + } } // Prepare sets the current transaction hash and index which are @@ -1361,8 +1398,13 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er // Write any contract code associated with the state object tasks <- func() { // Write any storage changes in the state object to its storage trie - _, err := obj.CommitTrie(s.db) - taskResults <- err + if !s.noTrie { + if _, err := obj.CommitTrie(s.db); err != nil { + taskResults <- err + return + } + } + taskResults <- nil } tasksNum++ } @@ -1379,24 +1421,27 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er // The onleaf func is called _serially_, so we can reuse the same account // for unmarshalling every time. - var account types.StateAccount - root, _, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error { - if err := rlp.DecodeBytes(leaf, &account); err != nil { + if !s.noTrie { + var account types.StateAccount + root, _, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error { + if err := rlp.DecodeBytes(leaf, &account); err != nil { + return nil + } + if account.Root != emptyRoot { + s.db.TrieDB().Reference(account.Root, parent) + } return nil + }) + if err != nil { + return err } - if account.Root != emptyRoot { - s.db.TrieDB().Reference(account.Root, parent) + if root != emptyRoot { + s.db.CacheAccount(root, s.trie) } - return nil - }) - if err != nil { - return err - } - if root != emptyRoot { - s.db.CacheAccount(root, s.trie) } + for _, postFunc := range postCommitFuncs { - err = postFunc() + err := postFunc() if err != nil { return err } diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 701c4c159d..3add16b18c 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -127,23 +127,21 @@ func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, go func(txset *types.TransactionsByPriceAndNonce) { count := 0 for { - tx := txset.Peek() - if tx == nil { - return - } select { case <-interruptCh: return default: - } - if count++; count%checkInterval == 0 { - if *txCurr == nil { + if count++; count%checkInterval == 0 { + txset.Forward(*txCurr) + } + tx := txset.Peek() + if tx == nil { return } - txset.Forward(*txCurr) + txCh <- tx + txset.Shift() + } - txCh <- tx - txset.Shift() } }(txs) } diff --git a/core/state_processor.go b/core/state_processor.go index 6190821bd8..3715730770 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -118,7 +118,7 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB return statedb, receipts, logs, gasUsed, nil } log.Error("do light process err at block", "num", block.NumberU64(), "err", err) - p.bc.removeDiffLayers(diffLayer.DiffHash) + p.bc.removeDiffLayers(diffLayer.DiffHash.Load().(common.Hash)) // prepare new statedb statedb.StopPrefetcher() parent := p.bc.GetHeader(block.ParentHash(), block.NumberU64()-1) @@ -409,6 +409,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg // usually do have two tx, one for validator set contract, another for system reward contract. systemTxs := make([]*types.Transaction, 0, 2) + for i, tx := range block.Transactions() { if isPoSA { if isSystemTx, err := posa.IsSystemTransaction(tx, block.Header()); err != nil { @@ -432,7 +433,6 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg bloomProcessors.Close() return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) } - commonTxs = append(commonTxs, tx) receipts = append(receipts, receipt) } diff --git a/core/types.go b/core/types.go index c9061233e6..db8d7a854f 100644 --- a/core/types.go +++ b/core/types.go @@ -32,6 +32,8 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error + // RemoteVerifyManager return remoteVerifyManager of validator. + RemoteVerifyManager() *remoteVerifyManager } // Prefetcher is an interface for pre-caching transaction signatures and state. diff --git a/core/types/block.go b/core/types/block.go index 711587fa1d..59c6adf2a9 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -40,6 +40,32 @@ var ( EmptyUncleHash = rlpHash([]*Header(nil)) ) +type VerifyStatus struct { + Code uint16 + Msg string +} + +var ( + // StatusVerified means the processing of request going as expected and found the root correctly. + StatusVerified = VerifyStatus{Code: 0x100} + StatusFullVerified = VerifyStatus{Code: 0x101, Msg: "state root full verified"} + StatusPartiallyVerified = VerifyStatus{Code: 0x102, Msg: "state root partially verified, because of difflayer not found"} + + // StatusFailed means the request has something wrong. + StatusFailed = VerifyStatus{Code: 0x200} + StatusDiffHashMismatch = VerifyStatus{Code: 0x201, Msg: "verify failed because of blockhash mismatch with diffhash"} + StatusImpossibleFork = VerifyStatus{Code: 0x202, Msg: "verify failed because of impossible fork detected"} + + // StatusUncertain means verify node can't give a certain result of the request. + StatusUncertain = VerifyStatus{Code: 0x300} + StatusBlockTooNew = VerifyStatus{Code: 0x301, Msg: "can’t verify because of block number larger than current height more than 11"} + StatusBlockNewer = VerifyStatus{Code: 0x302, Msg: "can’t verify because of block number larger than current height"} + StatusPossibleFork = VerifyStatus{Code: 0x303, Msg: "can’t verify because of possible fork detected"} + + // StatusUnexpectedError is unexpected internal error. + StatusUnexpectedError = VerifyStatus{Code: 0x400, Msg: "can’t verify because of unexpected internal error"} +) + // A BlockNonce is a 64-bit hash which proves (combined with the // mix-hash) that a sufficient amount of computation has been carried // out on a block. @@ -423,10 +449,10 @@ type DiffLayer struct { Accounts []DiffAccount Storages []DiffStorage - DiffHash common.Hash + DiffHash atomic.Value } -type extDiffLayer struct { +type ExtDiffLayer struct { BlockHash common.Hash Number uint64 Receipts []*ReceiptForStorage // Receipts are duplicated stored to simplify the logic @@ -438,7 +464,7 @@ type extDiffLayer struct { // DecodeRLP decodes the Ethereum func (d *DiffLayer) DecodeRLP(s *rlp.Stream) error { - var ed extDiffLayer + var ed ExtDiffLayer if err := s.Decode(&ed); err != nil { return err } @@ -458,7 +484,7 @@ func (d *DiffLayer) EncodeRLP(w io.Writer) error { for i, receipt := range d.Receipts { storageReceipts[i] = (*ReceiptForStorage)(receipt) } - return rlp.Encode(w, extDiffLayer{ + return rlp.Encode(w, ExtDiffLayer{ BlockHash: d.BlockHash, Number: d.Number, Receipts: storageReceipts, @@ -497,6 +523,13 @@ type DiffStorage struct { Vals [][]byte } +func (storage *DiffStorage) Len() int { return len(storage.Keys) } +func (storage *DiffStorage) Swap(i, j int) { + storage.Keys[i], storage.Keys[j] = storage.Keys[j], storage.Keys[i] + storage.Vals[i], storage.Vals[j] = storage.Vals[j], storage.Vals[i] +} +func (storage *DiffStorage) Less(i, j int) bool { return storage.Keys[i] < storage.Keys[j] } + type DiffAccountsInTx struct { TxHash common.Hash Accounts map[common.Address]*big.Int diff --git a/core/types/transaction.go b/core/types/transaction.go index 612e4df847..1e6c45cd7f 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -596,7 +596,9 @@ func (t *TransactionsByPriceAndNonce) CurrentSize() int { //Forward moves current transaction to be the one which is one index after tx func (t *TransactionsByPriceAndNonce) Forward(tx *Transaction) { if tx == nil { - t.heads = t.heads[0:0] + if len(t.heads) > 0 { + t.heads = t.heads[0:0] + } return } //check whether target tx exists in t.heads diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index 86c0dffc2c..e88dd3e852 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -438,7 +438,7 @@ func TestTransactionForward(t *testing.T) { } tmp := txset.Copy() - for j := 0; j < 10; j++ { + for j := 0; j < 11; j++ { txset = tmp.Copy() txsetCpy = tmp.Copy() i := 0 @@ -446,6 +446,9 @@ func TestTransactionForward(t *testing.T) { txset.Shift() } tx := txset.Peek() + if tx == nil { + continue + } txsetCpy.Forward(tx) txCpy := txsetCpy.Peek() if txCpy == nil { diff --git a/eth/backend.go b/eth/backend.go index 6b286d0c32..06e01be750 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -46,6 +46,7 @@ import ( "github.com/ethereum/go-ethereum/eth/protocols/diff" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" + "github.com/ethereum/go-ethereum/eth/protocols/trust" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" @@ -70,12 +71,13 @@ type Ethereum struct { config *ethconfig.Config // Handlers - txPool *core.TxPool - blockchain *core.BlockChain - handler *handler - ethDialCandidates enode.Iterator - snapDialCandidates enode.Iterator - merger *consensus.Merger + txPool *core.TxPool + blockchain *core.BlockChain + handler *handler + ethDialCandidates enode.Iterator + snapDialCandidates enode.Iterator + trustDialCandidates enode.Iterator + merger *consensus.Merger // DB interfaces chainDb ethdb.Database // Block chain database @@ -114,6 +116,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if !config.SyncMode.IsValid() { return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode) } + if !config.TriesVerifyMode.IsValid() { + return nil, fmt.Errorf("invalid tries verify mode %d", config.TriesVerifyMode) + } if config.Miner.GasPrice == nil || config.Miner.GasPrice.Cmp(common.Big0) <= 0 { log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", ethconfig.Defaults.Miner.GasPrice) config.Miner.GasPrice = new(big.Int).Set(ethconfig.Defaults.Miner.GasPrice) @@ -200,14 +205,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { TrieDirtyLimit: config.TrieDirtyCache, TrieDirtyDisabled: config.NoPruning, TrieTimeLimit: config.TrieTimeout, + NoTries: config.TriesVerifyMode != core.LocalVerify, SnapshotLimit: config.SnapshotCache, TriesInMemory: config.TriesInMemory, Preimages: config.Preimages, } ) bcOps := make([]core.BlockChainOption, 0) - // TODO diffsync performance is not as expected, disable it when pipecommit is enabled for now - if config.DiffSync && !config.PipeCommit { + if config.DiffSync && !config.PipeCommit && config.TriesVerifyMode == core.LocalVerify { bcOps = append(bcOps, core.EnableLightProcessor) } if config.PipeCommit { @@ -216,6 +221,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if config.PersistDiff { bcOps = append(bcOps, core.EnablePersistDiff(config.DiffBlock)) } + + peers := newPeerSet() + bcOps = append(bcOps, core.EnableBlockValidator(chainConfig, eth.engine, config.TriesVerifyMode, peers)) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit, bcOps...) if err != nil { return nil, err @@ -254,6 +262,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { DirectBroadcast: config.DirectBroadcast, DiffSync: config.DiffSync, DisablePeerTxBroadcast: config.DisablePeerTxBroadcast, + PeerSet: peers, }); err != nil { return nil, err } @@ -277,6 +286,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if err != nil { return nil, err } + eth.trustDialCandidates, err = dnsclient.NewIterator(eth.config.TrustDiscoveryURLs...) + if err != nil { + return nil, err + } // Start the RPC service eth.netRPCService = ethapi.NewPublicNetAPI(eth.p2pServer, config.NetworkId) @@ -565,7 +578,12 @@ func (s *Ethereum) Protocols() []p2p.Protocol { protos = append(protos, snap.MakeProtocols((*snapHandler)(s.handler), s.snapDialCandidates)...) } // diff protocol can still open without snap protocol - protos = append(protos, diff.MakeProtocols((*diffHandler)(s.handler), s.snapDialCandidates)...) + if !s.config.DisableDiffProtocol { + protos = append(protos, diff.MakeProtocols((*diffHandler)(s.handler), s.snapDialCandidates)...) + } + if s.config.EnableTrustProtocol { + protos = append(protos, trust.MakeProtocols((*trustHandler)(s.handler), s.snapDialCandidates)...) + } return protos } @@ -599,6 +617,7 @@ func (s *Ethereum) Stop() error { // Stop all the peer-related stuff first. s.ethDialCandidates.Close() s.snapDialCandidates.Close() + s.trustDialCandidates.Close() s.handler.Stop() // Then stop everything else. diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index e0ed4fc320..207ed39915 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" @@ -1451,6 +1452,9 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { // of the blocks delivered from the downloader, and the indexing will be off. log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err) } + if errors.Is(err, core.ErrAncestorHasNotBeenVerified) { + return err + } return fmt.Errorf("%w: %v", errInvalidChain, err) } return nil diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 295b1c49c1..7e29db0fcc 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -82,6 +82,7 @@ var Defaults = Config{ TrieDirtyCache: 256, TrieTimeout: 60 * time.Minute, TriesInMemory: 128, + TriesVerifyMode: core.LocalVerify, SnapshotCache: 102, DiffBlock: uint64(86400), Miner: miner.Config{ @@ -133,12 +134,15 @@ type Config struct { // This can be set to list of enrtree:// URLs which will be queried for // for nodes to connect to. - EthDiscoveryURLs []string - SnapDiscoveryURLs []string + EthDiscoveryURLs []string + SnapDiscoveryURLs []string + TrustDiscoveryURLs []string NoPruning bool // Whether to disable pruning and flush everything to disk DirectBroadcast bool DisableSnapProtocol bool //Whether disable snap protocol + DisableDiffProtocol bool //Whether disable diff protocol + EnableTrustProtocol bool //Whether enable trust protocol DiffSync bool // Whether support diff sync PipeCommit bool RangeLimit bool @@ -178,6 +182,7 @@ type Config struct { TrieTimeout time.Duration SnapshotCache int TriesInMemory uint64 + TriesVerifyMode core.VerifyMode Preimages bool // Mining options diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index e26ffa5222..2e5899eae6 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -18,45 +18,53 @@ import ( // MarshalTOML marshals as TOML. func (c Config) MarshalTOML() (interface{}, error) { type Config struct { - Genesis *core.Genesis `toml:",omitempty"` - NetworkId uint64 - SyncMode downloader.SyncMode - DisablePeerTxBroadcast bool - EthDiscoveryURLs []string - SnapDiscoveryURLs []string - NoPruning bool - NoPrefetch bool - TxLookupLimit uint64 `toml:",omitempty"` - Whitelist map[uint64]common.Hash `toml:"-"` - LightServ int `toml:",omitempty"` - LightIngress int `toml:",omitempty"` - LightEgress int `toml:",omitempty"` - LightPeers int `toml:",omitempty"` - LightNoPrune bool `toml:",omitempty"` - LightNoSyncServe bool `toml:",omitempty"` - SyncFromCheckpoint bool `toml:",omitempty"` - UltraLightServers []string `toml:",omitempty"` - UltraLightFraction int `toml:",omitempty"` - UltraLightOnlyAnnounce bool `toml:",omitempty"` - SkipBcVersionCheck bool `toml:"-"` - DatabaseHandles int `toml:"-"` - DatabaseCache int - DatabaseFreezer string - DatabaseDiff string - TrieCleanCache int - TrieCleanCacheJournal string `toml:",omitempty"` - TrieCleanCacheRejournal time.Duration `toml:",omitempty"` - TrieDirtyCache int - TrieTimeout time.Duration - TriesInMemory uint64 `toml:",omitempty"` - SnapshotCache int - Preimages bool - PersistDiff bool - DiffBlock uint64 `toml:",omitempty"` - Miner miner.Config - Ethash ethash.Config - TxPool core.TxPoolConfig - GPO gasprice.Config + Genesis *core.Genesis `toml:",omitempty"` + NetworkId uint64 + SyncMode downloader.SyncMode + DisablePeerTxBroadcast bool + EthDiscoveryURLs []string + SnapDiscoveryURLs []string + TrustDiscoveryURLs []string + NoPruning bool + NoPrefetch bool + DirectBroadcast bool + DisableSnapProtocol bool + DisableDiffProtocol bool + EnableTrustProtocol bool + DiffSync bool + RangeLimit bool + TxLookupLimit uint64 `toml:",omitempty"` + Whitelist map[uint64]common.Hash `toml:"-"` + LightServ int `toml:",omitempty"` + LightIngress int `toml:",omitempty"` + LightEgress int `toml:",omitempty"` + LightPeers int `toml:",omitempty"` + LightNoPrune bool `toml:",omitempty"` + LightNoSyncServe bool `toml:",omitempty"` + SyncFromCheckpoint bool `toml:",omitempty"` + UltraLightServers []string `toml:",omitempty"` + UltraLightFraction int `toml:",omitempty"` + UltraLightOnlyAnnounce bool `toml:",omitempty"` + SkipBcVersionCheck bool `toml:"-"` + DatabaseHandles int `toml:"-"` + DatabaseCache int + DatabaseFreezer string + DatabaseDiff string + PersistDiff bool + DiffBlock uint64 + TrieCleanCache int + TrieCleanCacheJournal string `toml:",omitempty"` + TrieCleanCacheRejournal time.Duration `toml:",omitempty"` + TrieDirtyCache int + TrieTimeout time.Duration + SnapshotCache int + TriesInMemory uint64 + TriesVerifyMode core.VerifyMode + Preimages bool + Miner miner.Config + Ethash ethash.Config `toml:",omitempty"` + TxPool core.TxPoolConfig + GPO gasprice.Config EnablePreimageRecording bool DocRoot string `toml:"-"` RPCGasCap uint64 @@ -75,7 +83,14 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.DisablePeerTxBroadcast = c.DisablePeerTxBroadcast enc.EthDiscoveryURLs = c.EthDiscoveryURLs enc.SnapDiscoveryURLs = c.SnapDiscoveryURLs + enc.TrustDiscoveryURLs = c.TrustDiscoveryURLs enc.NoPruning = c.NoPruning + enc.DirectBroadcast = c.DirectBroadcast + enc.DisableSnapProtocol = c.DisableSnapProtocol + enc.DisableDiffProtocol = c.DisableDiffProtocol + enc.EnableTrustProtocol = c.EnableTrustProtocol + enc.DiffSync = c.DiffSync + enc.RangeLimit = c.RangeLimit enc.TxLookupLimit = c.TxLookupLimit enc.Whitelist = c.Whitelist enc.LightServ = c.LightServ @@ -93,16 +108,17 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.DatabaseCache = c.DatabaseCache enc.DatabaseFreezer = c.DatabaseFreezer enc.DatabaseDiff = c.DatabaseDiff + enc.PersistDiff = c.PersistDiff + enc.DiffBlock = c.DiffBlock enc.TrieCleanCache = c.TrieCleanCache enc.TrieCleanCacheJournal = c.TrieCleanCacheJournal enc.TrieCleanCacheRejournal = c.TrieCleanCacheRejournal enc.TrieDirtyCache = c.TrieDirtyCache enc.TrieTimeout = c.TrieTimeout - enc.TriesInMemory = c.TriesInMemory enc.SnapshotCache = c.SnapshotCache + enc.TriesInMemory = c.TriesInMemory + enc.TriesVerifyMode = c.TriesVerifyMode enc.Preimages = c.Preimages - enc.PersistDiff = c.PersistDiff - enc.DiffBlock = c.DiffBlock enc.Miner = c.Miner enc.Ethash = c.Ethash enc.TxPool = c.TxPool @@ -123,45 +139,53 @@ func (c Config) MarshalTOML() (interface{}, error) { // UnmarshalTOML unmarshals from TOML. func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { type Config struct { - Genesis *core.Genesis `toml:",omitempty"` - NetworkId *uint64 - SyncMode *downloader.SyncMode - DisablePeerTxBroadcast *bool - EthDiscoveryURLs []string - SnapDiscoveryURLs []string - NoPruning *bool - NoPrefetch *bool - TxLookupLimit *uint64 `toml:",omitempty"` - Whitelist map[uint64]common.Hash `toml:"-"` - LightServ *int `toml:",omitempty"` - LightIngress *int `toml:",omitempty"` - LightEgress *int `toml:",omitempty"` - LightPeers *int `toml:",omitempty"` - LightNoPrune *bool `toml:",omitempty"` - LightNoSyncServe *bool `toml:",omitempty"` - SyncFromCheckpoint *bool `toml:",omitempty"` - UltraLightServers []string `toml:",omitempty"` - UltraLightFraction *int `toml:",omitempty"` - UltraLightOnlyAnnounce *bool `toml:",omitempty"` - SkipBcVersionCheck *bool `toml:"-"` - DatabaseHandles *int `toml:"-"` - DatabaseCache *int - DatabaseFreezer *string - DatabaseDiff *string - PersistDiff *bool - DiffBlock *uint64 `toml:",omitempty"` - TrieCleanCache *int - TrieCleanCacheJournal *string `toml:",omitempty"` - TrieCleanCacheRejournal *time.Duration `toml:",omitempty"` - TrieDirtyCache *int - TrieTimeout *time.Duration - TriesInMemory *uint64 `toml:",omitempty"` - SnapshotCache *int - Preimages *bool - Miner *miner.Config - Ethash *ethash.Config - TxPool *core.TxPoolConfig - GPO *gasprice.Config + Genesis *core.Genesis `toml:",omitempty"` + NetworkId *uint64 + SyncMode *downloader.SyncMode + DisablePeerTxBroadcast *bool + EthDiscoveryURLs []string + SnapDiscoveryURLs []string + TrustDiscoveryURLs []string + NoPruning *bool + NoPrefetch *bool + DirectBroadcast *bool + DisableSnapProtocol *bool + DisableDiffProtocol *bool + EnableTrustProtocol *bool + DiffSync *bool + RangeLimit *bool + TxLookupLimit *uint64 `toml:",omitempty"` + Whitelist map[uint64]common.Hash `toml:"-"` + LightServ *int `toml:",omitempty"` + LightIngress *int `toml:",omitempty"` + LightEgress *int `toml:",omitempty"` + LightPeers *int `toml:",omitempty"` + LightNoPrune *bool `toml:",omitempty"` + LightNoSyncServe *bool `toml:",omitempty"` + SyncFromCheckpoint *bool `toml:",omitempty"` + UltraLightServers []string `toml:",omitempty"` + UltraLightFraction *int `toml:",omitempty"` + UltraLightOnlyAnnounce *bool `toml:",omitempty"` + SkipBcVersionCheck *bool `toml:"-"` + DatabaseHandles *int `toml:"-"` + DatabaseCache *int + DatabaseFreezer *string + DatabaseDiff *string + PersistDiff *bool + DiffBlock *uint64 + TrieCleanCache *int + TrieCleanCacheJournal *string `toml:",omitempty"` + TrieCleanCacheRejournal *time.Duration `toml:",omitempty"` + TrieDirtyCache *int + TrieTimeout *time.Duration + SnapshotCache *int + TriesInMemory *uint64 + TriesVerifyMode *core.VerifyMode + Preimages *bool + Miner *miner.Config + Ethash *ethash.Config `toml:",omitempty"` + TxPool *core.TxPoolConfig + GPO *gasprice.Config EnablePreimageRecording *bool DocRoot *string `toml:"-"` RPCGasCap *uint64 @@ -195,9 +219,30 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.SnapDiscoveryURLs != nil { c.SnapDiscoveryURLs = dec.SnapDiscoveryURLs } + if dec.TrustDiscoveryURLs != nil { + c.TrustDiscoveryURLs = dec.TrustDiscoveryURLs + } if dec.NoPruning != nil { c.NoPruning = *dec.NoPruning } + if dec.DirectBroadcast != nil { + c.DirectBroadcast = *dec.DirectBroadcast + } + if dec.DisableSnapProtocol != nil { + c.DisableSnapProtocol = *dec.DisableSnapProtocol + } + if dec.DisableDiffProtocol != nil { + c.DisableDiffProtocol = *dec.DisableDiffProtocol + } + if dec.EnableTrustProtocol != nil { + c.EnableTrustProtocol = *dec.EnableTrustProtocol + } + if dec.DiffSync != nil { + c.DiffSync = *dec.DiffSync + } + if dec.RangeLimit != nil { + c.RangeLimit = *dec.RangeLimit + } if dec.TxLookupLimit != nil { c.TxLookupLimit = *dec.TxLookupLimit } @@ -270,11 +315,14 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.TrieTimeout != nil { c.TrieTimeout = *dec.TrieTimeout } + if dec.SnapshotCache != nil { + c.SnapshotCache = *dec.SnapshotCache + } if dec.TriesInMemory != nil { c.TriesInMemory = *dec.TriesInMemory } - if dec.SnapshotCache != nil { - c.SnapshotCache = *dec.SnapshotCache + if dec.TriesVerifyMode != nil { + c.TriesVerifyMode = *dec.TriesVerifyMode } if dec.Preimages != nil { c.Preimages = *dec.Preimages diff --git a/eth/handler.go b/eth/handler.go index 41d523e574..88ec4833bb 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -35,6 +35,7 @@ import ( "github.com/ethereum/go-ethereum/eth/protocols/diff" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" + "github.com/ethereum/go-ethereum/eth/protocols/trust" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -96,6 +97,7 @@ type handlerConfig struct { Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged DirectBroadcast bool DisablePeerTxBroadcast bool + PeerSet *peerSet } type handler struct { @@ -145,6 +147,9 @@ func newHandler(config *handlerConfig) (*handler, error) { if config.EventMux == nil { config.EventMux = new(event.TypeMux) // Nicety initialization for tests } + if config.PeerSet == nil { + config.PeerSet = newPeerSet() // Nicety initialization for tests + } h := &handler{ networkID: config.Network, forkFilter: forkid.NewFilter(config.Chain), @@ -153,7 +158,7 @@ func newHandler(config *handlerConfig) (*handler, error) { database: config.Database, txpool: config.TxPool, chain: config.Chain, - peers: newPeerSet(), + peers: config.PeerSet, merger: config.Merger, whitelist: config.Whitelist, directBroadcast: config.DirectBroadcast, @@ -311,6 +316,11 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Error("Diff extension barrier failed", "err", err) return err } + trust, err := h.peers.waitTrustExtension(peer) + if err != nil { + peer.Log().Error("Trust extension barrier failed", "err", err) + return err + } // TODO(karalabe): Not sure why this is needed if !h.chainSync.handlePeerEvent(peer) { return p2p.DiscQuitting @@ -351,7 +361,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Debug("Ethereum peer connected", "name", peer.Name()) // Register the peer locally - if err := h.peers.registerPeer(peer, snap, diff); err != nil { + if err := h.peers.registerPeer(peer, snap, diff, trust); err != nil { peer.Log().Error("Ethereum peer registration failed", "err", err) return err } @@ -500,6 +510,21 @@ func (h *handler) runDiffExtension(peer *diff.Peer, handler diff.Handler) error return handler(peer) } +// runTrustExtension registers a `trust` peer into the joint eth/trust peerset and +// starts handling inbound messages. As `trust` is only a satellite protocol to +// `eth`, all subsystem registrations and lifecycle management will be done by +// the main `eth` handler to prevent strange races. +func (h *handler) runTrustExtension(peer *trust.Peer, handler trust.Handler) error { + h.peerWG.Add(1) + defer h.peerWG.Done() + + if err := h.peers.registerTrustExtension(peer); err != nil { + peer.Log().Error("Trust extension registration failed", "err", err) + return err + } + return handler(peer) +} + // removePeer requests disconnection of a peer. func (h *handler) removePeer(id string) { peer := h.peers.peer(id) diff --git a/eth/handler_trust.go b/eth/handler_trust.go new file mode 100644 index 0000000000..0b116b9255 --- /dev/null +++ b/eth/handler_trust.go @@ -0,0 +1,52 @@ +package eth + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/eth/protocols/trust" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +// trustHandler implements the trust.Backend interface to handle the various network +// packets that are sent as replies or broadcasts. +type trustHandler handler + +func (h *trustHandler) Chain() *core.BlockChain { return h.chain } + +// RunPeer is invoked when a peer joins on the `snap` protocol. +func (h *trustHandler) RunPeer(peer *trust.Peer, hand trust.Handler) error { + return (*handler)(h).runTrustExtension(peer, hand) +} + +// PeerInfo retrieves all known `trust` information about a peer. +func (h *trustHandler) PeerInfo(id enode.ID) interface{} { + if p := h.peers.peer(id.String()); p != nil { + if p.trustExt != nil { + return p.trustExt.info() + } + } + return nil +} + +// Handle is invoked from a peer's message handler when it receives a new remote +// message that the handler couldn't consume and serve itself. +func (h *trustHandler) Handle(peer *trust.Peer, packet trust.Packet) error { + switch packet := packet.(type) { + case *trust.RootResponsePacket: + verifyResult := &core.VerifyResult{ + Status: packet.Status, + BlockNumber: packet.BlockNumber, + BlockHash: packet.BlockHash, + Root: packet.Root, + } + if vm := h.Chain().Validator().RemoteVerifyManager(); vm != nil { + vm.HandleRootResponse(verifyResult, peer.ID()) + return nil + } + return fmt.Errorf("verify manager is nil which is unexpected") + + default: + return fmt.Errorf("unexpected trust packet type: %T", packet) + } +} diff --git a/eth/peer.go b/eth/peer.go index 83acf98e9f..802d1e5aac 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -20,6 +20,8 @@ import ( "math/big" "time" + "github.com/ethereum/go-ethereum/eth/protocols/trust" + "github.com/ethereum/go-ethereum/eth/protocols/diff" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" @@ -36,8 +38,9 @@ type ethPeerInfo struct { // ethPeer is a wrapper around eth.Peer to maintain a few extra metadata. type ethPeer struct { *eth.Peer - snapExt *snapPeer // Satellite `snap` connection - diffExt *diffPeer + snapExt *snapPeer // Satellite `snap` connection + diffExt *diffPeer + trustExt *trustPeer syncDrop *time.Timer // Connection dropper if `eth` sync progress isn't validated in time snapWait chan struct{} // Notification channel for snap connections @@ -67,6 +70,12 @@ type diffPeerInfo struct { DiffSync bool `json:"diff_sync"` } +// trustPeerInfo represents a short summary of the `trust` sub-protocol metadata known +// about a connected peer. +type trustPeerInfo struct { + Version uint `json:"version"` // Trust protocol version negotiated +} + // snapPeer is a wrapper around snap.Peer to maintain a few extra metadata. type snapPeer struct { *snap.Peer @@ -77,6 +86,11 @@ type diffPeer struct { *diff.Peer } +// trustPeer is a wrapper around trust.Peer to maintain a few extra metadata. +type trustPeer struct { + *trust.Peer +} + // info gathers and returns some `diff` protocol metadata known about a peer. func (p *diffPeer) info() *diffPeerInfo { return &diffPeerInfo{ @@ -91,3 +105,10 @@ func (p *snapPeer) info() *snapPeerInfo { Version: p.Version(), } } + +// info gathers and returns some `trust` protocol metadata known about a peer. +func (p *trustPeer) info() *trustPeerInfo { + return &trustPeerInfo{ + Version: p.Version(), + } +} diff --git a/eth/peerset.go b/eth/peerset.go index 0f5245a05e..b68d0e7783 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -23,10 +23,12 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/protocols/diff" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" + "github.com/ethereum/go-ethereum/eth/protocols/trust" "github.com/ethereum/go-ethereum/p2p" ) @@ -53,6 +55,10 @@ var ( // errDiffWithoutEth is returned if a peer attempts to connect only on the // diff protocol without advertising the eth main protocol. errDiffWithoutEth = errors.New("peer connected on diff without compatible eth support") + + // errTrustWithoutEth is returned if a peer attempts to connect only on the + // trust protocol without advertising the eth main protocol. + errTrustWithoutEth = errors.New("peer connected on trust without compatible eth support") ) const ( @@ -73,6 +79,9 @@ type peerSet struct { diffWait map[string]chan *diff.Peer // Peers connected on `eth` waiting for their diff extension diffPend map[string]*diff.Peer // Peers connected on the `diff` protocol, but not yet on `eth` + trustWait map[string]chan *trust.Peer // Peers connected on `eth` waiting for their trust extension + trustPend map[string]*trust.Peer // Peers connected on the `trust` protocol, but not yet on `eth` + lock sync.RWMutex closed bool } @@ -80,11 +89,13 @@ type peerSet struct { // newPeerSet creates a new peer set to track the active participants. func newPeerSet() *peerSet { return &peerSet{ - peers: make(map[string]*ethPeer), - snapWait: make(map[string]chan *snap.Peer), - snapPend: make(map[string]*snap.Peer), - diffWait: make(map[string]chan *diff.Peer), - diffPend: make(map[string]*diff.Peer), + peers: make(map[string]*ethPeer), + snapWait: make(map[string]chan *snap.Peer), + snapPend: make(map[string]*snap.Peer), + diffWait: make(map[string]chan *diff.Peer), + diffPend: make(map[string]*diff.Peer), + trustWait: make(map[string]chan *trust.Peer), + trustPend: make(map[string]*trust.Peer), } } @@ -148,6 +159,40 @@ func (ps *peerSet) registerDiffExtension(peer *diff.Peer) error { return nil } +// registerTrustExtension unblocks an already connected `eth` peer waiting for its +// `trust` extension, or if no such peer exists, tracks the extension for the time +// being until the `eth` main protocol starts looking for it. +func (ps *peerSet) registerTrustExtension(peer *trust.Peer) error { + // Reject the peer if it advertises `trust` without `eth` as `trust` is only a + // satellite protocol meaningful with the chain selection of `eth` + if !peer.RunningCap(eth.ProtocolName, eth.ProtocolVersions) { + return errTrustWithoutEth + } + // If the peer isn't verify node, don't register trust extension into eth protocol. + if !peer.VerifyNode() { + return nil + } + // Ensure nobody can double connect + ps.lock.Lock() + defer ps.lock.Unlock() + + id := peer.ID() + if _, ok := ps.peers[id]; ok { + return errPeerAlreadyRegistered // avoid connections with the same id as existing ones + } + if _, ok := ps.trustPend[id]; ok { + return errPeerAlreadyRegistered // avoid connections with the same id as pending ones + } + // Inject the peer into an `eth` counterpart is available, otherwise save for later + if wait, ok := ps.trustWait[id]; ok { + delete(ps.trustWait, id) + wait <- peer + return nil + } + ps.trustPend[id] = peer + return nil +} + // waitExtensions blocks until all satellite protocols are connected and tracked // by the peerset. func (ps *peerSet) waitSnapExtension(peer *eth.Peer) (*snap.Peer, error) { @@ -234,6 +279,53 @@ func (ps *peerSet) waitDiffExtension(peer *eth.Peer) (*diff.Peer, error) { } } +// waitTrustExtension blocks until all satellite protocols are connected and tracked +// by the peerset. +func (ps *peerSet) waitTrustExtension(peer *eth.Peer) (*trust.Peer, error) { + // If the peer does not support a compatible `trust`, don't wait + if !peer.RunningCap(trust.ProtocolName, trust.ProtocolVersions) { + return nil, nil + } + // If the peer isn't verify node, don't register trust extension into eth protocol. + if !peer.VerifyNode() { + return nil, nil + } + // Ensure nobody can double connect + ps.lock.Lock() + + id := peer.ID() + if _, ok := ps.peers[id]; ok { + ps.lock.Unlock() + return nil, errPeerAlreadyRegistered // avoid connections with the same id as existing ones + } + if _, ok := ps.trustWait[id]; ok { + ps.lock.Unlock() + return nil, errPeerAlreadyRegistered // avoid connections with the same id as pending ones + } + // If `trust` already connected, retrieve the peer from the pending set + if trust, ok := ps.trustPend[id]; ok { + delete(ps.trustPend, id) + + ps.lock.Unlock() + return trust, nil + } + // Otherwise wait for `trust` to connect concurrently + wait := make(chan *trust.Peer) + ps.trustWait[id] = wait + ps.lock.Unlock() + + select { + case peer := <-wait: + return peer, nil + + case <-time.After(extensionWaitTimeout): + ps.lock.Lock() + delete(ps.trustWait, id) + ps.lock.Unlock() + return nil, errPeerWaitTimeout + } +} + func (ps *peerSet) GetDiffPeer(pid string) downloader.IDiffPeer { if p := ps.peer(pid); p != nil && p.diffExt != nil { return p.diffExt @@ -241,9 +333,23 @@ func (ps *peerSet) GetDiffPeer(pid string) downloader.IDiffPeer { return nil } +// GetVerifyPeers returns an array of verify nodes. +func (ps *peerSet) GetVerifyPeers() []core.VerifyPeer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + res := make([]core.VerifyPeer, 0) + for _, p := range ps.peers { + if p.trustExt != nil && p.trustExt.Peer != nil { + res = append(res, p.trustExt.Peer) + } + } + return res +} + // registerPeer injects a new `eth` peer into the working set, or returns an error // if the peer is already known. -func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Peer) error { +func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Peer, trustExt *trust.Peer) error { // Start tracking the new peer ps.lock.Lock() defer ps.lock.Unlock() @@ -265,6 +371,9 @@ func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Pe if diffExt != nil { eth.diffExt = &diffPeer{diffExt} } + if trustExt != nil { + eth.trustExt = &trustPeer{trustExt} + } ps.peers[id] = eth return nil } diff --git a/eth/protocols/diff/protocol.go b/eth/protocols/diff/protocol.go index 4467d0b327..e6bf5b3e14 100644 --- a/eth/protocols/diff/protocol.go +++ b/eth/protocols/diff/protocol.go @@ -92,7 +92,7 @@ func (p *DiffLayersPacket) Unpack() ([]*types.DiffLayer, error) { var diffHash common.Hash hasher.Sum(diffHash[:0]) hasher.Reset() - diff.DiffHash = diffHash + diff.DiffHash.Store(diffHash) } return diffLayers, nil } diff --git a/eth/protocols/trust/discovery.go b/eth/protocols/trust/discovery.go new file mode 100644 index 0000000000..ce38ec5ed9 --- /dev/null +++ b/eth/protocols/trust/discovery.go @@ -0,0 +1,14 @@ +package trust + +import "github.com/ethereum/go-ethereum/rlp" + +// enrEntry is the ENR entry which advertises `trust` protocol on the discovery. +type enrEntry struct { + // Ignore additional fields (for forward compatibility). + Rest []rlp.RawValue `rlp:"tail"` +} + +// ENRKey implements enr.Entry. +func (e enrEntry) ENRKey() string { + return "trust" +} diff --git a/eth/protocols/trust/handler.go b/eth/protocols/trust/handler.go new file mode 100644 index 0000000000..f10aff5178 --- /dev/null +++ b/eth/protocols/trust/handler.go @@ -0,0 +1,157 @@ +package trust + +import ( + "fmt" + "time" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" +) + +// Handler is a callback to invoke from an outside runner after the boilerplate +// exchanges have passed. +type Handler func(peer *Peer) error + +type Backend interface { + // Chain retrieves the blockchain object to serve data. + Chain() *core.BlockChain + + // RunPeer is invoked when a peer joins on the `eth` protocol. The handler + // should do any peer maintenance work, handshakes and validations. If all + // is passed, control should be given back to the `handler` to process the + // inbound messages going forward. + RunPeer(peer *Peer, handler Handler) error + + PeerInfo(id enode.ID) interface{} + + Handle(peer *Peer, packet Packet) error +} + +// MakeProtocols constructs the P2P protocol definitions for `trust`. +func MakeProtocols(backend Backend, dnsdisc enode.Iterator) []p2p.Protocol { + // Filter the discovery iterator for nodes advertising trust support. + dnsdisc = enode.Filter(dnsdisc, func(n *enode.Node) bool { + var trust enrEntry + return n.Load(&trust) == nil + }) + + protocols := make([]p2p.Protocol, len(ProtocolVersions)) + for i, version := range ProtocolVersions { + version := version // Closure + + protocols[i] = p2p.Protocol{ + Name: ProtocolName, + Version: version, + Length: protocolLengths[version], + Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + return backend.RunPeer(NewPeer(version, p, rw), func(peer *Peer) error { + defer peer.Close() + return Handle(backend, peer) + }) + }, + NodeInfo: func() interface{} { + return nodeInfo(backend.Chain()) + }, + PeerInfo: func(id enode.ID) interface{} { + return backend.PeerInfo(id) + }, + Attributes: []enr.Entry{&enrEntry{}}, + DialCandidates: dnsdisc, + } + } + return protocols +} + +// Handle is the callback invoked to manage the life cycle of a `trust` peer. +// When this function terminates, the peer is disconnected. +func Handle(backend Backend, peer *Peer) error { + for { + if err := handleMessage(backend, peer); err != nil { + peer.Log().Debug("Message handling failed in `trust`", "err", err) + return err + } + } +} + +// handleMessage is invoked whenever an inbound message is received from a +// remote peer on the `diff` protocol. The remote connection is torn down upon +// returning any error. +func handleMessage(backend Backend, peer *Peer) error { + // Read the next message from the remote peer, and ensure it's fully consumed + msg, err := peer.rw.ReadMsg() + if err != nil { + return err + } + if msg.Size > maxMessageSize { + return fmt.Errorf("%w: %v > %v", errMsgTooLarge, msg.Size, maxMessageSize) + } + defer msg.Discard() + + // Track the amount of time it takes to serve the request and run the handler + if metrics.Enabled { + h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code) + defer func(start time.Time) { + sampler := func() metrics.Sample { + return metrics.ResettingSample( + metrics.NewExpDecaySample(1028, 0.015), + ) + } + metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(start).Microseconds()) + }(time.Now()) + } + // Handle the message depending on its contents + switch { + case msg.Code == RequestRootMsg: + return handleRootRequest(backend, msg, peer) + + case msg.Code == RespondRootMsg: + return handleRootResponse(backend, msg, peer) + + default: + return fmt.Errorf("%w: %v", errInvalidMsgCode, msg.Code) + } +} + +type Decoder interface { + Decode(val interface{}) error + Time() time.Time +} + +func handleRootRequest(backend Backend, msg Decoder, peer *Peer) error { + req := new(RootRequestPacket) + if err := msg.Decode(req); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + + res := backend.Chain().GetVerifyResult(req.BlockNumber, req.BlockHash, req.DiffHash) + return p2p.Send(peer.rw, RespondRootMsg, RootResponsePacket{ + RequestId: req.RequestId, + Status: res.Status, + BlockNumber: req.BlockNumber, + BlockHash: req.BlockHash, + Root: res.Root, + Extra: defaultExtra, + }) +} + +func handleRootResponse(backend Backend, msg Decoder, peer *Peer) error { + res := new(RootResponsePacket) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + + requestTracker.Fulfil(peer.id, peer.version, RespondRootMsg, res.RequestId) + return backend.Handle(peer, res) +} + +// NodeInfo represents a short summary of the `trust` sub-protocol metadata +// known about the host peer. +type NodeInfo struct{} + +// nodeInfo retrieves some `trust` protocol metadata about the running host node. +func nodeInfo(chain *core.BlockChain) *NodeInfo { + return &NodeInfo{} +} diff --git a/eth/protocols/trust/handler_test.go b/eth/protocols/trust/handler_test.go new file mode 100644 index 0000000000..8e9ef6ad58 --- /dev/null +++ b/eth/protocols/trust/handler_test.go @@ -0,0 +1,271 @@ +package trust + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/clique" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/params" +) + +var ( + // testKey is a private key to use for funding a tester account. + testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + + // testAddr is the Ethereum address of the tester account. + testAddr = crypto.PubkeyToAddress(testKey.PublicKey) +) + +// testBackend is a mock implementation of the live Ethereum message handler. Its +// purpose is to allow testing the request/reply workflows and wire serialization +// in the `eth` protocol without actually doing any data processing. +type testBackend struct { + db ethdb.Database + chain *core.BlockChain + txpool *core.TxPool +} + +// newTestBackend creates an empty chain and wraps it into a mock backend. +func newTestBackend(blocks int) *testBackend { + return newTestBackendWithGenerator(blocks) +} + +// newTestBackend creates a chain with a number of explicitly defined blocks and +// wraps it into a mock backend. +func newTestBackendWithGenerator(blocks int) *testBackend { + signer := types.HomesteadSigner{} + db := rawdb.NewMemoryDatabase() + engine := clique.New(params.AllCliqueProtocolChanges.Clique, db) + genspec := &core.Genesis{ + //Config: params.TestChainConfig, + ExtraData: make([]byte, 32+common.AddressLength+65), + Alloc: core.GenesisAlloc{testAddr: {Balance: big.NewInt(100000000000000000)}}, + BaseFee: big.NewInt(0), + } + copy(genspec.ExtraData[32:], testAddr[:]) + genesis := genspec.MustCommit(db) + + chain, _ := core.NewBlockChain(db, nil, params.AllCliqueProtocolChanges, engine, vm.Config{}, nil, nil) + generator := func(i int, block *core.BlockGen) { + // The chain maker doesn't have access to a chain, so the difficulty will be + // lets unset (nil). Set it here to the correct value. + // block.SetCoinbase(testAddr) + block.SetDifficulty(big.NewInt(2)) + + // We want to simulate an empty middle block, having the same state as the + // first one. The last is needs a state change again to force a reorg. + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddr), common.Address{0x01}, big.NewInt(1), params.TxGas, nil, nil), signer, testKey) + if err != nil { + panic(err) + } + block.AddTxWithChain(chain, tx) + } + + bs, _ := core.GenerateChain(params.AllCliqueProtocolChanges, genesis, engine, db, blocks, generator) + for i, block := range bs { + header := block.Header() + if i > 0 { + header.ParentHash = bs[i-1].Hash() + } + header.Extra = make([]byte, 32+65) + header.Difficulty = big.NewInt(2) + + sig, _ := crypto.Sign(clique.SealHash(header).Bytes(), testKey) + copy(header.Extra[len(header.Extra)-65:], sig) + bs[i] = block.WithSeal(header) + } + + if _, err := chain.InsertChain(bs); err != nil { + panic(err) + } + + txconfig := core.DefaultTxPoolConfig + txconfig.Journal = "" // Don't litter the disk with test journals + + return &testBackend{ + db: db, + chain: chain, + txpool: core.NewTxPool(txconfig, params.AllCliqueProtocolChanges, chain), + } +} + +// close tears down the transaction pool and chain behind the mock backend. +func (b *testBackend) close() { + b.txpool.Stop() + b.chain.Stop() +} + +func (b *testBackend) Chain() *core.BlockChain { return b.chain } + +func (b *testBackend) RunPeer(peer *Peer, handler Handler) error { + // Normally the backend would do peer mainentance and handshakes. All that + // is omitted and we will just give control back to the handler. + return handler(peer) +} +func (b *testBackend) PeerInfo(enode.ID) interface{} { panic("not implemented") } + +func (b *testBackend) Handle(*Peer, Packet) error { + panic("data processing tests should be done in the handler package") +} + +func TestRequestRoot(t *testing.T) { testRequestRoot(t, Trust1) } + +func testRequestRoot(t *testing.T, protocol uint) { + t.Parallel() + + blockNum := 1032 // The latest 1024 blocks' DiffLayer will be cached. + backend := newTestBackend(blockNum) + defer backend.close() + + peer, _ := newTestPeer("peer", protocol, backend) + defer peer.close() + + pairs := []struct { + req RootRequestPacket + res RootResponsePacket + }{ + { + req: RootRequestPacket{ + RequestId: 1, + BlockNumber: 1, + }, + res: RootResponsePacket{ + RequestId: 1, + Status: types.StatusPartiallyVerified, + BlockNumber: 1, + Extra: defaultExtra, + }, + }, + { + req: RootRequestPacket{ + RequestId: 2, + BlockNumber: 128, + }, + res: RootResponsePacket{ + RequestId: 2, + Status: types.StatusFullVerified, + BlockNumber: 128, + Extra: defaultExtra, + }, + }, + { + req: RootRequestPacket{ + RequestId: 3, + BlockNumber: 128, + BlockHash: types.EmptyRootHash, + DiffHash: types.EmptyRootHash, + }, + res: RootResponsePacket{ + RequestId: 3, + Status: types.StatusImpossibleFork, + BlockNumber: 128, + BlockHash: types.EmptyRootHash, + Root: common.Hash{}, + Extra: defaultExtra, + }, + }, + { + req: RootRequestPacket{ + RequestId: 4, + BlockNumber: 128, + DiffHash: types.EmptyRootHash, + }, + res: RootResponsePacket{ + RequestId: 4, + Status: types.StatusDiffHashMismatch, + BlockNumber: 128, + Root: common.Hash{}, + Extra: defaultExtra, + }, + }, + { + req: RootRequestPacket{ + RequestId: 5, + BlockNumber: 1024, + }, + res: RootResponsePacket{ + RequestId: 5, + Status: types.StatusFullVerified, + BlockNumber: 1024, + Extra: defaultExtra, + }, + }, + { + req: RootRequestPacket{ + RequestId: 6, + BlockNumber: 1024, + BlockHash: types.EmptyRootHash, + DiffHash: types.EmptyRootHash, + }, + res: RootResponsePacket{ + RequestId: 6, + Status: types.StatusPossibleFork, + BlockNumber: 1024, + BlockHash: types.EmptyRootHash, + Root: common.Hash{}, + Extra: defaultExtra, + }, + }, + { + req: RootRequestPacket{ + RequestId: 7, + BlockNumber: 1033, + BlockHash: types.EmptyRootHash, + DiffHash: types.EmptyRootHash, + }, + res: RootResponsePacket{ + RequestId: 7, + Status: types.StatusBlockNewer, + BlockNumber: 1033, + BlockHash: types.EmptyRootHash, + Root: common.Hash{}, + Extra: defaultExtra, + }, + }, + { + req: RootRequestPacket{ + RequestId: 8, + BlockNumber: 1044, + BlockHash: types.EmptyRootHash, + DiffHash: types.EmptyRootHash, + }, + res: RootResponsePacket{ + RequestId: 8, + Status: types.StatusBlockTooNew, + BlockNumber: 1044, + BlockHash: types.EmptyRootHash, + Root: common.Hash{}, + Extra: defaultExtra, + }, + }, + } + + for idx, pair := range pairs { + header := backend.Chain().GetHeaderByNumber(pair.req.BlockNumber) + if header != nil { + if pair.res.Status.Code&0xFF00 == types.StatusVerified.Code { + pair.req.BlockHash = header.Hash() + pair.req.DiffHash, _ = core.CalculateDiffHash(backend.Chain().GetTrustedDiffLayer(header.Hash())) + pair.res.BlockHash = pair.req.BlockHash + pair.res.Root = header.Root + } else if pair.res.Status.Code == types.StatusDiffHashMismatch.Code { + pair.req.BlockHash = header.Hash() + pair.res.BlockHash = pair.req.BlockHash + } + } + + p2p.Send(peer.app, RequestRootMsg, pair.req) + if err := p2p.ExpectMsg(peer.app, RespondRootMsg, pair.res); err != nil { + t.Errorf("test %d: root response not expected: %v", idx, err) + } + } +} diff --git a/eth/protocols/trust/peer.go b/eth/protocols/trust/peer.go new file mode 100644 index 0000000000..18ba229914 --- /dev/null +++ b/eth/protocols/trust/peer.go @@ -0,0 +1,66 @@ +package trust + +import ( + "math/rand" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" +) + +// Peer is a collection of relevant information we have about a `trust` peer. +type Peer struct { + id string // Unique ID for the peer, cached + + *p2p.Peer // The embedded P2P package peer + rw p2p.MsgReadWriter // Input/output streams for diff + version uint // Protocol version negotiated + logger log.Logger // Contextual logger with the peer id injected +} + +// NewPeer create a wrapper for a network connection and negotiated protocol +// version. +func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) *Peer { + id := p.ID().String() + peer := &Peer{ + id: id, + Peer: p, + rw: rw, + version: version, + logger: log.New("peer", id[:8]), + } + return peer +} + +// ID retrieves the peer's unique identifier. +func (p *Peer) ID() string { + return p.id +} + +// Version retrieves the peer's negoatiated `diff` protocol version. +func (p *Peer) Version() uint { + return p.version +} + +// Log overrides the P2P logget with the higher level one containing only the id. +func (p *Peer) Log() log.Logger { + return p.logger +} + +// Close signals the broadcast goroutine to terminate. Only ever call this if +// you created the peer yourself via NewPeer. Otherwise let whoever created it +// clean it up! +func (p *Peer) Close() { +} + +func (p *Peer) RequestRoot(blockNumber uint64, blockHash common.Hash, diffHash common.Hash) error { + id := rand.Uint64() + + requestTracker.Track(p.id, p.version, RequestRootMsg, RespondRootMsg, id) + return p2p.Send(p.rw, RequestRootMsg, RootRequestPacket{ + RequestId: id, + BlockNumber: blockNumber, + BlockHash: blockHash, + DiffHash: diffHash, + }) +} diff --git a/eth/protocols/trust/peer_test.go b/eth/protocols/trust/peer_test.go new file mode 100644 index 0000000000..ab229a1b32 --- /dev/null +++ b/eth/protocols/trust/peer_test.go @@ -0,0 +1,42 @@ +package trust + +import ( + "math/rand" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +// testPeer is a simulated peer to allow testing direct network calls. +type testPeer struct { + *Peer + + net p2p.MsgReadWriter // Network layer reader/writer to simulate remote messaging + app *p2p.MsgPipeRW // Application layer reader/writer to simulate the local side +} + +// newTestPeer creates a new peer registered at the given data backend. +func newTestPeer(name string, version uint, backend Backend) (*testPeer, <-chan error) { + // Create a message pipe to communicate through + app, net := p2p.MsgPipe() + + // Start the peer on a new thread + var id enode.ID + rand.Read(id[:]) + + peer := NewPeer(version, p2p.NewPeer(id, name, nil), net) + errc := make(chan error, 1) + go func() { + errc <- backend.RunPeer(peer, func(peer *Peer) error { + return Handle(backend, peer) + }) + }() + return &testPeer{app: app, net: net, Peer: peer}, errc +} + +// close terminates the local side of the peer, notifying the remote protocol +// manager of termination. +func (p *testPeer) close() { + p.Peer.Close() + p.app.Close() +} diff --git a/eth/protocols/trust/protocol.go b/eth/protocols/trust/protocol.go new file mode 100644 index 0000000000..e4f98dd324 --- /dev/null +++ b/eth/protocols/trust/protocol.go @@ -0,0 +1,71 @@ +package trust + +import ( + "errors" + + "github.com/ethereum/go-ethereum/core/types" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" +) + +// Constants to match up protocol versions and messages +const ( + Trust1 = 1 +) + +// ProtocolName is the official short name of the `trust` protocol used during +// devp2p capability negotiation. +const ProtocolName = "trust" + +// ProtocolVersions are the supported versions of the `trust` protocol (first +// is primary). +var ProtocolVersions = []uint{Trust1} + +// protocolLengths are the number of implemented message corresponding to +// different protocol versions. +var protocolLengths = map[uint]uint64{Trust1: 2} + +// maxMessageSize is the maximum cap on the size of a protocol message. +const maxMessageSize = 10 * 1024 * 1024 + +const ( + RequestRootMsg = 0x00 + RespondRootMsg = 0x01 +) + +var defaultExtra = []byte{0x00} + +var ( + errMsgTooLarge = errors.New("message too long") + errDecode = errors.New("invalid message") + errInvalidMsgCode = errors.New("invalid message code") +) + +// Packet represents a p2p message in the `trust` protocol. +type Packet interface { + Name() string // Name returns a string corresponding to the message type. + Kind() byte // Kind returns the message type. +} + +type RootRequestPacket struct { + RequestId uint64 + BlockNumber uint64 + BlockHash common.Hash + DiffHash common.Hash +} + +type RootResponsePacket struct { + RequestId uint64 + Status types.VerifyStatus + BlockNumber uint64 + BlockHash common.Hash + Root common.Hash + Extra rlp.RawValue // for extension +} + +func (*RootRequestPacket) Name() string { return "RequestRoot" } +func (*RootRequestPacket) Kind() byte { return RequestRootMsg } + +func (*RootResponsePacket) Name() string { return "RootResponse" } +func (*RootResponsePacket) Kind() byte { return RespondRootMsg } diff --git a/eth/protocols/trust/tracker.go b/eth/protocols/trust/tracker.go new file mode 100644 index 0000000000..ab492b3fb8 --- /dev/null +++ b/eth/protocols/trust/tracker.go @@ -0,0 +1,10 @@ +package trust + +import ( + "time" + + "github.com/ethereum/go-ethereum/p2p/tracker" +) + +// requestTracker is a singleton tracker for request times. +var requestTracker = tracker.New(ProtocolName, time.Minute) diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 951b65940a..db388a3d9d 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" ) @@ -200,6 +201,12 @@ func (ec *Client) GetDiffAccountsWithScope(ctx context.Context, number *big.Int, return &result, err } +func (ec *Client) GetRootByDiffHash(ctx context.Context, blockNr *big.Int, blockHash common.Hash, diffHash common.Hash) (*core.VerifyResult, error) { + var result core.VerifyResult + err := ec.c.CallContext(ctx, &result, "eth_getRootByDiffHash", toBlockNumArg(blockNr), blockHash, diffHash) + return &result, err +} + type rpcTransaction struct { tx *types.Transaction txExtraInfo diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 6473ec4535..58fa326344 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1335,6 +1335,10 @@ func (s *PublicBlockChainAPI) GetDiffAccountsWithScope(ctx context.Context, bloc return result, err } +func (s *PublicBlockChainAPI) GetVerifyResult(ctx context.Context, blockNr rpc.BlockNumber, blockHash common.Hash, diffHash common.Hash) *core.VerifyResult { + return s.b.Chain().GetVerifyResult(uint64(blockNr), blockHash, diffHash) +} + // ExecutionResult groups all structured logs emitted by the EVM // while replaying a transaction in debug mode as well as transaction // execution status, the amount of gas used and the return value diff --git a/light/trie.go b/light/trie.go index 418ffaa3b6..d41536e069 100644 --- a/light/trie.go +++ b/light/trie.go @@ -50,6 +50,10 @@ type odrDatabase struct { backend OdrBackend } +func (db *odrDatabase) NoTries() bool { + return false +} + func (db *odrDatabase) OpenTrie(root common.Hash) (state.Trie, error) { return &odrTrie{db: db, id: db.id}, nil } @@ -190,6 +194,10 @@ func (t *odrTrie) do(key []byte, fn func() error) error { } } +func (db *odrTrie) NoTries() bool { + return false +} + type nodeIterator struct { trie.NodeIterator t *odrTrie diff --git a/p2p/peer.go b/p2p/peer.go index 96783d83ac..7b81d4731b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -235,6 +235,11 @@ func (p *Peer) Inbound() bool { return p.rw.is(inboundConn) } +// VerifyNode returns true if the peer is a verification connection +func (p *Peer) VerifyNode() bool { + return p.rw.is(verifyConn) +} + func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer { protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ diff --git a/p2p/server.go b/p2p/server.go index e076fd9348..c816e36762 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -114,6 +114,10 @@ type Config struct { // maintained and re-connected on disconnects. StaticNodes []*enode.Node + // Verify nodes are used as pre-configured connections which are always + // maintained and re-connected on disconnects. + VerifyNodes []*enode.Node + // Trusted nodes are used as pre-configured connections which are always // allowed to connect, even above the peer limit. TrustedNodes []*enode.Node @@ -218,6 +222,7 @@ const ( staticDialedConn inboundConn trustedConn + verifyConn ) // conn wraps a network connection with information gathered @@ -269,6 +274,9 @@ func (f connFlag) String() string { if f&inboundConn != 0 { s += "-inbound" } + if f&verifyConn != 0 { + s += "-verify" + } if s != "" { s = s[1:] } @@ -649,6 +657,9 @@ func (srv *Server) setupDialScheduler() { for _, n := range srv.StaticNodes { srv.dialsched.addStatic(n) } + for _, n := range srv.VerifyNodes { + srv.dialsched.addStatic(n) + } } func (srv *Server) maxInboundConns() int { @@ -934,6 +945,13 @@ func (srv *Server) checkInboundConn(remoteIP net.IP) error { // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error { + // If dialDest is verify node, set verifyConn flags. + for _, n := range srv.VerifyNodes { + if dialDest == n { + flags |= verifyConn + } + } + c := &conn{fd: fd, flags: flags, cont: make(chan error)} if dialDest == nil { c.transport = srv.newTransport(fd, nil) diff --git a/tests/state_test_util.go b/tests/state_test_util.go index 3e66a1c55b..e71cd2bf3c 100644 --- a/tests/state_test_util.go +++ b/tests/state_test_util.go @@ -271,7 +271,7 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc, snapshotter boo var snaps *snapshot.Tree if snapshotter { - snaps, _ = snapshot.New(db, sdb.TrieDB(), 1, 128, root, false, true, false) + snaps, _ = snapshot.New(db, sdb.TrieDB(), 1, 128, root, false, true, false, false) } statedb, _ = state.New(root, sdb, snaps) return snaps, statedb diff --git a/trie/database.go b/trie/database.go index fcbade7f62..b8d1aa6eea 100644 --- a/trie/database.go +++ b/trie/database.go @@ -282,6 +282,7 @@ type Config struct { Cache int // Memory allowance (MB) to use for caching trie nodes in memory Journal string // Journal of clean cache to survive node restarts Preimages bool // Flag whether the preimage of trie key is recorded + NoTries bool } // NewDatabase creates a new trie database to store ephemeral trie content before diff --git a/trie/dummy_trie.go b/trie/dummy_trie.go new file mode 100644 index 0000000000..f01962cff9 --- /dev/null +++ b/trie/dummy_trie.go @@ -0,0 +1,101 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package trie + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" +) + +type EmptyTrie struct{} + +func (t *EmptyTrie) Prove(key []byte, fromLevel uint, proofDb ethdb.KeyValueWriter) error { + return nil +} + +// NewSecure creates a dummy trie +func NewEmptyTrie() *EmptyTrie { + return &EmptyTrie{} +} + +func (t *EmptyTrie) Get(key []byte) []byte { + return nil +} + +func (t *EmptyTrie) TryGet(key []byte) ([]byte, error) { + return nil, nil +} + +func (t *EmptyTrie) TryGetNode(path []byte) ([]byte, int, error) { + return nil, 0, nil +} +func (t *EmptyTrie) Update(key, value []byte) {} + +func (t *EmptyTrie) TryUpdate(key, value []byte) error { + return nil +} + +func (t *EmptyTrie) TryUpdateAccount(key []byte, acc *types.StateAccount) error { + return nil +} + +// Delete removes any existing value for key from the trie. +func (t *EmptyTrie) Delete(key []byte) { + if err := t.TryDelete(key); err != nil { + log.Error(fmt.Sprintf("Unhandled trie error: %v", err)) + } +} + +func (t *EmptyTrie) TryDelete(key []byte) error { + + return nil +} + +func (t *EmptyTrie) GetKey(shaKey []byte) []byte { + return nil +} + +func (t *EmptyTrie) Commit(onleaf LeafCallback) (root common.Hash, committed int, err error) { + + return common.Hash{}, 0, nil +} + +func (t *EmptyTrie) Hash() common.Hash { + return common.Hash{} +} + +// Copy returns a copy of SecureTrie. +func (t *EmptyTrie) Copy() *EmptyTrie { + cpy := *t + return &cpy +} + +func (t *EmptyTrie) ResetCopy() *EmptyTrie { + cpy := *t + return &cpy +} + +// NodeIterator returns an iterator that returns nodes of the underlying trie. Iteration +// starts at the key after the given start key. +func (t *EmptyTrie) NodeIterator(start []byte) NodeIterator { + return nil +}