From 8fba72be00e7c28a29cdef416df739954970a0da Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 12 Jun 2024 14:52:11 +0800 Subject: [PATCH] all: use timer instead of time.After in loops, to avoid memleaks (#97) all: use timer instead of time.After in loops, to avoid memleaks (#91) Co-authored-by: Ryan He <163962984+ryanmorphl2@users.noreply.github.com> --- core/bloombits/matcher.go | 8 ++++++-- eth/downloader/beaconsync.go | 6 +++++- eth/downloader/downloader.go | 12 ++++++++++-- ethstats/ethstats.go | 5 ++++- p2p/simulations/adapters/exec.go | 5 ++++- p2p/simulations/mocker.go | 10 ++++++++-- p2p/simulations/network.go | 5 ++++- 7 files changed, 41 insertions(+), 10 deletions(-) diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go index 743a68282..8f4ada60a 100644 --- a/core/bloombits/matcher.go +++ b/core/bloombits/matcher.go @@ -596,6 +596,9 @@ func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [] // of the session, any request in-flight need to be responded to! Empty responses // are fine though in that case. func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) { + waitTimer := time.NewTimer(wait) + defer waitTimer.Stop() + for { // Allocate a new bloom bit index to retrieve data for, stopping when done bit, ok := s.allocateRetrieval() @@ -604,6 +607,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan } // Bit allocated, throttle a bit if we're below our batch limit if s.pendingSections(bit) < batch { + waitTimer.Reset(wait) select { case <-s.quit: // Session terminating, we can't meaningfully service, abort @@ -611,8 +615,8 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan s.deliverSections(bit, []uint64{}, [][]byte{}) return - case <-time.After(wait): - // Throttling up, fetch whatever's available + case <-waitTimer.C: + // Throttling up, fetch whatever is available } } // Allocate as much as we can handle and request servicing diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index f14df312b..6149785aa 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -289,6 +289,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { localHeaders = d.readHeaderRange(tail, int(count)) log.Warn("Retrieved beacon headers from local", "from", from, "count", count) } + fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck) + defer fsHeaderContCheckTimer.Stop() + for { // Some beacon headers might have appeared since the last cycle, make // sure we're always syncing to all available ones @@ -381,8 +384,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { } // State sync still going, wait a bit for new headers and retry log.Trace("Pivot not yet committed, waiting...") + fsHeaderContCheckTimer.Reset(fsHeaderContCheck) select { - case <-time.After(fsHeaderContCheck): + case <-fsHeaderContCheckTimer.C: case <-d.cancelCh: return errCanceled } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 92384c570..927018eff 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1258,6 +1258,7 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis) rollbackErr error mode = d.getMode() + timer = time.NewTimer(time.Second) ) defer func() { if rollback > 0 { @@ -1284,6 +1285,8 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode // Wait for batches of headers to process gotHeaders := false + defer timer.Stop() + for { select { case <-d.cancelCh: @@ -1439,11 +1442,12 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode if mode == FullSync || mode == SnapSync { // If we've reached the allowed number of pending headers, stall a bit for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { + timer.Reset(time.Second) select { case <-d.cancelCh: rollbackErr = errCanceled return errCanceled - case <-time.After(time.Second): + case <-timer.C: } } // Otherwise insert the headers for content retrieval @@ -1599,7 +1603,10 @@ func (d *Downloader) processSnapSyncContent() error { var ( oldPivot *fetchResult // Locked in pivot block, might change eventually oldTail []*fetchResult // Downloaded content after the pivot + timer = time.NewTimer(time.Second) ) + defer timer.Stop() + for { // Wait for the next batch of downloaded data to be available, and if the pivot // block became stale, move the goalpost @@ -1673,6 +1680,7 @@ func (d *Downloader) processSnapSyncContent() error { oldPivot = P } // Wait for completion, occasionally checking for pivot staleness + timer.Reset(time.Second) select { case <-sync.done: if sync.err != nil { @@ -1683,7 +1691,7 @@ func (d *Downloader) processSnapSyncContent() error { } oldPivot = nil - case <-time.After(time.Second): + case <-timer.C: oldTail = afterP continue } diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index d0533be8f..1e33b8010 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -541,10 +541,13 @@ func (s *Service) reportLatency(conn *connWrapper) error { return err } // Wait for the pong request to arrive back + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { case <-s.pongCh: // Pong delivered, report the latency - case <-time.After(5 * time.Second): + case <-timer.C: // Ping timeout, abort return errors.New("ping timed out") } diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index e9f880f86..40a96dbeb 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -303,10 +303,13 @@ func (n *ExecNode) Stop() error { go func() { waitErr <- n.Cmd.Wait() }() + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { case err := <-waitErr: return err - case <-time.After(5 * time.Second): + case <-timer.C: return n.Cmd.Process.Kill() } } diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go index e168cb12f..a29b1cd9e 100644 --- a/p2p/simulations/mocker.go +++ b/p2p/simulations/mocker.go @@ -65,8 +65,13 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) { if err != nil { panic("Could not startup node network for mocker") } - tick := time.NewTicker(10 * time.Second) + var ( + tick = time.NewTicker(10 * time.Second) + timer = time.NewTimer(3 * time.Second) + ) defer tick.Stop() + defer timer.Stop() + for { select { case <-quit: @@ -80,11 +85,12 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) { return } + timer.Reset(3 * time.Second) select { case <-quit: log.Info("Terminating simulation loop") return - case <-time.After(3 * time.Second): + case <-timer.C: } log.Debug("starting node", "id", id) diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 20e796036..56764a98a 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -1032,11 +1032,14 @@ func (net *Network) Load(snap *Snapshot) error { } } + timeout := time.NewTimer(snapshotLoadTimeout) + defer timeout.Stop() + select { // Wait until all connections from the snapshot are established. case <-allConnected: // Make sure that we do not wait forever. - case <-time.After(snapshotLoadTimeout): + case <-timeout.C: return errors.New("snapshot connections not established") } return nil