From 32f15203c1938e40748e5436fbb418859c6d6127 Mon Sep 17 00:00:00 2001 From: Bin <49082129+songzhibin97@users.noreply.github.com> Date: Tue, 9 Apr 2024 14:51:54 +0800 Subject: [PATCH] all: use timer instead of time.After in loops, to avoid memleaks (#29241) time.After is equivalent to NewTimer(d).C, and does not call Stop if the timer is no longer needed. This can cause memory leaks. This change changes many such occations to use NewTimer instead, and calling Stop once the timer is no longer needed. --- core/bloombits/matcher.go | 6 +++++- eth/downloader/beaconsync.go | 6 +++++- eth/downloader/downloader.go | 12 ++++++++++-- ethstats/ethstats.go | 5 ++++- p2p/simulations/adapters/exec.go | 5 ++++- p2p/simulations/mocker.go | 10 ++++++++-- p2p/simulations/network.go | 5 ++++- 7 files changed, 40 insertions(+), 9 deletions(-) diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go index 6a4cfb23db19..486581fe23d7 100644 --- a/core/bloombits/matcher.go +++ b/core/bloombits/matcher.go @@ -596,6 +596,9 @@ func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [] // of the session, any request in-flight need to be responded to! Empty responses // are fine though in that case. func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) { + waitTimer := time.NewTimer(wait) + defer waitTimer.Stop() + for { // Allocate a new bloom bit index to retrieve data for, stopping when done bit, ok := s.allocateRetrieval() @@ -604,6 +607,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan } // Bit allocated, throttle a bit if we're below our batch limit if s.pendingSections(bit) < batch { + waitTimer.Reset(wait) select { case <-s.quit: // Session terminating, we can't meaningfully service, abort @@ -611,7 +615,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan s.deliverSections(bit, []uint64{}, [][]byte{}) return - case <-time.After(wait): + case <-waitTimer.C: // Throttling up, fetch whatever is available } } diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index d3f75c852703..7dfc419f4e9c 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -289,6 +289,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { localHeaders = d.readHeaderRange(tail, int(count)) log.Warn("Retrieved beacon headers from local", "from", from, "count", count) } + fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck) + defer fsHeaderContCheckTimer.Stop() + for { // Some beacon headers might have appeared since the last cycle, make // sure we're always syncing to all available ones @@ -381,8 +384,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { } // State sync still going, wait a bit for new headers and retry log.Trace("Pivot not yet committed, waiting...") + fsHeaderContCheckTimer.Reset(fsHeaderContCheck) select { - case <-time.After(fsHeaderContCheck): + case <-fsHeaderContCheckTimer.C: case <-d.cancelCh: return errCanceled } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 6b26822e22b7..941f575aa898 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1276,7 +1276,10 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode var ( mode = d.getMode() gotHeaders = false // Wait for batches of headers to process + timer = time.NewTimer(time.Second) ) + defer timer.Stop() + for { select { case <-d.cancelCh: @@ -1397,10 +1400,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode if mode == FullSync || mode == SnapSync { // If we've reached the allowed number of pending headers, stall a bit for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { + timer.Reset(time.Second) select { case <-d.cancelCh: return errCanceled - case <-time.After(time.Second): + case <-timer.C: } } // Otherwise insert the headers for content retrieval @@ -1567,7 +1571,10 @@ func (d *Downloader) processSnapSyncContent() error { var ( oldPivot *fetchResult // Locked in pivot block, might change eventually oldTail []*fetchResult // Downloaded content after the pivot + timer = time.NewTimer(time.Second) ) + defer timer.Stop() + for { // Wait for the next batch of downloaded data to be available. If we have // not yet reached the pivot point, wait blockingly as there's no need to @@ -1650,6 +1657,7 @@ func (d *Downloader) processSnapSyncContent() error { oldPivot = P } // Wait for completion, occasionally checking for pivot staleness + timer.Reset(time.Second) select { case <-sync.done: if sync.err != nil { @@ -1660,7 +1668,7 @@ func (d *Downloader) processSnapSyncContent() error { } oldPivot = nil - case <-time.After(time.Second): + case <-timer.C: oldTail = afterP continue } diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index 6e71666ec121..c845db1164f5 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -544,10 +544,13 @@ func (s *Service) reportLatency(conn *connWrapper) error { return err } // Wait for the pong request to arrive back + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { case <-s.pongCh: // Pong delivered, report the latency - case <-time.After(5 * time.Second): + case <-timer.C: // Ping timeout, abort return errors.New("ping timed out") } diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index 5df2d7649cd8..6307b90bf81c 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -303,10 +303,13 @@ func (n *ExecNode) Stop() error { go func() { waitErr <- n.Cmd.Wait() }() + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { case err := <-waitErr: return err - case <-time.After(5 * time.Second): + case <-timer.C: return n.Cmd.Process.Kill() } } diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go index 0dc04e65f921..8763df67ef39 100644 --- a/p2p/simulations/mocker.go +++ b/p2p/simulations/mocker.go @@ -65,8 +65,13 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) { if err != nil { panic("Could not startup node network for mocker") } - tick := time.NewTicker(10 * time.Second) + var ( + tick = time.NewTicker(10 * time.Second) + timer = time.NewTimer(3 * time.Second) + ) defer tick.Stop() + defer timer.Stop() + for { select { case <-quit: @@ -80,11 +85,12 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) { return } + timer.Reset(3 * time.Second) select { case <-quit: log.Info("Terminating simulation loop") return - case <-time.After(3 * time.Second): + case <-timer.C: } log.Debug("starting node", "id", id) diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 0225a3bbaafb..2eb8333cd600 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -1028,11 +1028,14 @@ func (net *Network) Load(snap *Snapshot) error { } } + timeout := time.NewTimer(snapshotLoadTimeout) + defer timeout.Stop() + select { // Wait until all connections from the snapshot are established. case <-allConnected: // Make sure that we do not wait forever. - case <-time.After(snapshotLoadTimeout): + case <-timeout.C: return errors.New("snapshot connections not established") } return nil