Skip to content

Commit

Permalink
fix useful difflayer item in cache been prune issue
Browse files Browse the repository at this point in the history
add logs

fix useful difflayer item in cache been prune issue

fix too many open files
  • Loading branch information
unclezoro committed Nov 9, 2021
1 parent 176407a commit fea8dd7
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 80 deletions.
4 changes: 2 additions & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,8 @@ var (
}
DiffBlockFlag = cli.Uint64Flag{
Name: "diffblock",
Usage: "The number of blocks should be persisted in db (default = 864000 )",
Value: uint64(864000),
Usage: "The number of blocks should be persisted in db (default = 86400)",
Value: uint64(86400),
}
// Miner settings
MiningEnabledFlag = cli.BoolFlag{
Expand Down
42 changes: 10 additions & 32 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,9 @@ func (bc *BlockChain) cacheReceipts(hash common.Hash, receipts types.Receipts) {
}

func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer) {
if bc.diffLayerCache.Len() >= diffLayerCacheLimit {
bc.diffLayerCache.RemoveOldest()
}
bc.diffLayerCache.Add(diffLayer.BlockHash, diffLayer)
if bc.db.DiffStore() != nil {
// push to priority queue before persisting
Expand Down Expand Up @@ -2618,34 +2621,6 @@ func (bc *BlockChain) removeDiffLayers(diffHash common.Hash) {
}
}

func (bc *BlockChain) RemoveDiffPeer(pid string) {
bc.diffMux.Lock()
defer bc.diffMux.Unlock()
if invaliDiffHashes := bc.diffPeersToDiffHashes[pid]; invaliDiffHashes != nil {
for invalidDiffHash := range invaliDiffHashes {
lastDiffHash := false
if peers, ok := bc.diffHashToPeers[invalidDiffHash]; ok {
delete(peers, pid)
if len(peers) == 0 {
lastDiffHash = true
delete(bc.diffHashToPeers, invalidDiffHash)
}
}
if lastDiffHash {
affectedBlockHash := bc.diffHashToBlockHash[invalidDiffHash]
if diffs, exist := bc.blockHashToDiffLayers[affectedBlockHash]; exist {
delete(diffs, invalidDiffHash)
if len(diffs) == 0 {
delete(bc.blockHashToDiffLayers, affectedBlockHash)
}
}
delete(bc.diffHashToBlockHash, invalidDiffHash)
}
}
delete(bc.diffPeersToDiffHashes, pid)
}
}

func (bc *BlockChain) untrustedDiffLayerPruneLoop() {
recheck := time.NewTicker(diffLayerPruneRecheckInterval)
bc.wg.Add(1)
Expand Down Expand Up @@ -2713,24 +2688,27 @@ func (bc *BlockChain) HandleDiffLayer(diffLayer *types.DiffLayer, pid string, fu
// Basic check
currentHeight := bc.CurrentBlock().NumberU64()
if diffLayer.Number > currentHeight && diffLayer.Number-currentHeight > maxDiffQueueDist {
log.Error("diff layers too new from current", "pid", pid)
log.Debug("diff layers too new from current", "pid", pid)
return nil
}
if diffLayer.Number < currentHeight && currentHeight-diffLayer.Number > maxDiffForkDist {
log.Error("diff layers too old from current", "pid", pid)
log.Debug("diff layers too old from current", "pid", pid)
return nil
}

bc.diffMux.Lock()
defer bc.diffMux.Unlock()
if blockHash, exist := bc.diffHashToBlockHash[diffLayer.DiffHash]; exist && blockHash == diffLayer.BlockHash {
return nil
}

if !fulfilled && len(bc.diffPeersToDiffHashes[pid]) > maxDiffLimitForBroadcast {
log.Error("too many accumulated diffLayers", "pid", pid)
log.Debug("too many accumulated diffLayers", "pid", pid)
return nil
}

if len(bc.diffPeersToDiffHashes[pid]) > maxDiffLimit {
log.Error("too many accumulated diffLayers", "pid", pid)
log.Debug("too many accumulated diffLayers", "pid", pid)
return nil
}
if _, exist := bc.diffPeersToDiffHashes[pid]; exist {
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func TestPruneDiffLayer(t *testing.T) {
if len(fullBackend.chain.diffNumToBlockHashes) != maxDiffForkDist {
t.Error("unexpected size of diffNumToBlockHashes")
}
if len(fullBackend.chain.diffPeersToDiffHashes) != 2 {
if len(fullBackend.chain.diffPeersToDiffHashes) != 1 {
t.Error("unexpected size of diffPeersToDiffHashes")
}
if len(fullBackend.chain.blockHashToDiffLayers) != maxDiffForkDist {
Expand Down
13 changes: 7 additions & 6 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import (

const (
fullProcessCheck = 21 // On diff sync mode, will do full process every fullProcessCheck randomly
recentTime = 2048 * 3
recentDiffLayerTimeout = 20
recentTime = 1024 * 3
recentDiffLayerTimeout = 5
farDiffLayerTimeout = 2
)

Expand All @@ -68,15 +68,16 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen
}

type LightStateProcessor struct {
randomGenerator *rand.Rand
check int64
StateProcessor
}

func NewLightStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *LightStateProcessor {
randomGenerator := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
check := randomGenerator.Int63n(fullProcessCheck)
return &LightStateProcessor{
randomGenerator: randomGenerator,
StateProcessor: *NewStateProcessor(config, bc, engine),
check: check,
StateProcessor: *NewStateProcessor(config, bc, engine),
}
}

Expand All @@ -86,7 +87,7 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB
allowLightProcess = posa.AllowLightProcess(p.bc, block.Header())
}
// random fallback to full process
if check := p.randomGenerator.Int63n(fullProcessCheck); allowLightProcess && check != 0 && len(block.Transactions()) != 0 {
if allowLightProcess && block.NumberU64()%fullProcessCheck != uint64(p.check) && len(block.Transactions()) != 0 {
var pid string
if peer, ok := block.ReceivedFrom.(PeerIDer); ok {
pid = peer.ID()
Expand Down
68 changes: 41 additions & 27 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ var (
ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts

diffFetchTick = 10 * time.Millisecond
diffFetchLimit = 5

qosTuningPeers = 5 // Number of peers to tune based on (best peers)
qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
Expand Down Expand Up @@ -161,10 +164,10 @@ type Downloader struct {
quitLock sync.Mutex // Lock to prevent double closes

// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult, chan struct{}) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}

// LightChain encapsulates functions required to synchronise a light chain.
Expand Down Expand Up @@ -232,27 +235,35 @@ type IPeerSet interface {

func EnableDiffFetchOp(peers IPeerSet) DownloadOption {
return func(dl *Downloader) *Downloader {
var hook = func(headers []*types.Header, args ...interface{}) {
if len(args) < 2 {
return
}
peerID, ok := args[1].(string)
if !ok {
return
}
mode, ok := args[0].(SyncMode)
if !ok {
return
}
if ep := peers.GetDiffPeer(peerID); mode == FullSync && ep != nil {
hashes := make([]common.Hash, 0, len(headers))
for _, header := range headers {
hashes = append(hashes, header.Hash())
}
ep.RequestDiffLayers(hashes)
var hook = func(results []*fetchResult, stop chan struct{}) {
if dl.getMode() == FullSync {
go func() {
ticker := time.NewTicker(diffFetchTick)
defer ticker.Stop()
for _, r := range results {
Wait:
for {
select {
case <-stop:
return
case <-ticker.C:
if dl.blockchain.CurrentHeader().Number.Int64()+int64(diffFetchLimit) > r.Header.Number.Int64() {
break Wait
}
}
}
if ep := peers.GetDiffPeer(r.pid); ep != nil {
// It turns out a diff layer is 5x larger than block, we just request one diffLayer each time
err := ep.RequestDiffLayers([]common.Hash{r.Header.Hash()})
if err != nil {
return
}
}
}
}()
}
}
dl.bodyFetchHook = hook
dl.chainInsertHook = hook
return dl
}
}
Expand Down Expand Up @@ -1405,7 +1416,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// - kind: textual label of the type being downloaded to display in log messages
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
fetchHook func([]*types.Header, ...interface{}), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {

// Create a ticker to detect expired retrieval tasks
Expand Down Expand Up @@ -1554,7 +1565,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
fetchHook(request.Headers, d.getMode(), peer.id)
fetchHook(request.Headers)
}
if err := fetch(peer, request); err != nil {
// Although we could try and make an attempt to fix this, this error really
Expand Down Expand Up @@ -1759,12 +1770,15 @@ func (d *Downloader) processFullSyncContent() error {
if len(results) == 0 {
return nil
}
stop := make(chan struct{})
if d.chainInsertHook != nil {
d.chainInsertHook(results)
d.chainInsertHook(results, stop)
}
if err := d.importBlockResults(results); err != nil {
close(stop)
return err
}
close(stop)
}
}

Expand Down Expand Up @@ -1850,7 +1864,7 @@ func (d *Downloader) processFastSyncContent() error {
}
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
d.chainInsertHook(results, nil)
}
// If we haven't downloaded the pivot block yet, check pivot staleness
// notifications from the header downloader
Expand Down
6 changes: 3 additions & 3 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {

// Wrap the importer to allow stepping
blocked, proceed := uint32(0), make(chan struct{})
tester.downloader.chainInsertHook = func(results []*fetchResult) {
tester.downloader.chainInsertHook = func(results []*fetchResult, _ chan struct{}) {
atomic.StoreUint32(&blocked, uint32(len(results)))
<-proceed
}
Expand Down Expand Up @@ -921,10 +921,10 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {

// Instrument the downloader to signal body requests
bodiesHave, receiptsHave := int32(0), int32(0)
tester.downloader.bodyFetchHook = func(headers []*types.Header, _ ...interface{}) {
tester.downloader.bodyFetchHook = func(headers []*types.Header) {
atomic.AddInt32(&bodiesHave, int32(len(headers)))
}
tester.downloader.receiptFetchHook = func(headers []*types.Header, _ ...interface{}) {
tester.downloader.receiptFetchHook = func(headers []*types.Header) {
atomic.AddInt32(&receiptsHave, int32(len(headers)))
}
// Synchronise with the peer and make sure all blocks were retrieved
Expand Down
6 changes: 4 additions & 2 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,18 @@ type fetchRequest struct {
// all outstanding pieces complete and the result as a whole can be processed.
type fetchResult struct {
pending int32 // Flag telling what deliveries are outstanding
pid string

Header *types.Header
Uncles []*types.Header
Transactions types.Transactions
Receipts types.Receipts
}

func newFetchResult(header *types.Header, fastSync bool) *fetchResult {
func newFetchResult(header *types.Header, fastSync bool, pid string) *fetchResult {
item := &fetchResult{
Header: header,
pid: pid,
}
if !header.EmptyBody() {
item.pending |= (1 << bodyType)
Expand Down Expand Up @@ -503,7 +505,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
// we can ask the resultcache if this header is within the
// "prioritized" segment of blocks. If it is not, we need to throttle

stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == FastSync)
stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == FastSync, p.id)
if stale {
// Don't put back in the task queue, this item has already been
// delivered upstream
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/resultstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 {
// throttled - if true, the store is at capacity, this particular header is not prio now
// item - the result to store data into
// err - any error that occurred
func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, throttled bool, item *fetchResult, err error) {
func (r *resultStore) AddFetch(header *types.Header, fastSync bool, pid string) (stale, throttled bool, item *fetchResult, err error) {
r.lock.Lock()
defer r.lock.Unlock()

Expand All @@ -85,7 +85,7 @@ func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, thro
return stale, throttled, item, err
}
if item == nil {
item = newFetchResult(header, fastSync)
item = newFetchResult(header, fastSync, pid)
r.items[index] = item
}
return stale, throttled, item, err
Expand Down
2 changes: 1 addition & 1 deletion eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var Defaults = Config{
TrieTimeout: 60 * time.Minute,
TriesInMemory: 128,
SnapshotCache: 102,
DiffBlock: uint64(864000),
DiffBlock: uint64(86400),
Miner: miner.Config{
GasFloor: 8000000,
GasCeil: 8000000,
Expand Down
1 change: 0 additions & 1 deletion eth/handler_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (h *diffHandler) RunPeer(peer *diff.Peer, hand diff.Handler) error {
ps.lock.Unlock()
return err
}
defer h.chain.RemoveDiffPeer(peer.ID())
return (*handler)(h).runDiffExtension(peer, hand)
}

Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/diff/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
softResponseLimit = 2 * 1024 * 1024

// maxDiffLayerServe is the maximum number of diff layers to serve.
maxDiffLayerServe = 1024
maxDiffLayerServe = 128
)

var requestTracker = NewTracker(time.Minute)
Expand Down
10 changes: 8 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ const (
closedState
)

const chainDataHandlesPercentage = 80

// New creates a new P2P node, ready for protocol registration.
func New(conf *Config) (*Node, error) {
// Copy config and resolve the datadir so future changes to the current
Expand Down Expand Up @@ -580,12 +582,16 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r
}

func (n *Node) OpenAndMergeDatabase(name string, cache, handles int, freezer, diff, namespace string, readonly, persistDiff bool) (ethdb.Database, error) {
chainDB, err := n.OpenDatabaseWithFreezer(name, cache, handles, freezer, namespace, readonly)
chainDataHandles := handles
if persistDiff {
chainDataHandles = handles * chainDataHandlesPercentage / 100
}
chainDB, err := n.OpenDatabaseWithFreezer(name, cache, chainDataHandles, freezer, namespace, readonly)
if err != nil {
return nil, err
}
if persistDiff {
diffStore, err := n.OpenDiffDatabase(name, handles, diff, namespace, readonly)
diffStore, err := n.OpenDiffDatabase(name, handles-chainDataHandles, diff, namespace, readonly)
if err != nil {
chainDB.Close()
return nil, err
Expand Down

0 comments on commit fea8dd7

Please sign in to comment.