From 201d280ef0aff4fd74a207c1b8a299f3cd7562bb Mon Sep 17 00:00:00 2001 From: Samuel Stokes Date: Mon, 8 Apr 2024 16:38:04 -0400 Subject: [PATCH] Add rangeReqId so we can cancel all associated block req via single flag --- op-node/p2p/sync.go | 88 +++++++++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 34 deletions(-) diff --git a/op-node/p2p/sync.go b/op-node/p2p/sync.go index 6bf0fc242c0f..e8e9d11f286f 100644 --- a/op-node/p2p/sync.go +++ b/op-node/p2p/sync.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "math/big" + "math/rand" "sync" "sync/atomic" "time" @@ -97,7 +98,8 @@ type syncResult struct { type peerRequest struct { num uint64 - complete *atomic.Bool + complete *atomic.Bool + rangeReqId uint64 } type inFlightCheck struct { @@ -211,9 +213,10 @@ type SyncClient struct { // inFlight requests are not repeated inFlight map[uint64]*atomic.Bool - requests chan rangeRequest - peerRequests chan peerRequest - inFlightChecks chan inFlightCheck + rangeRequests chan rangeRequest + activeRangeRequests map[uint64]bool + peerRequests chan peerRequest + inFlightChecks chan inFlightCheck results chan syncResult @@ -238,23 +241,24 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc ctx, cancel := context.WithCancel(context.Background()) c := &SyncClient{ - log: log, - cfg: cfg, - metrics: metrics, - appScorer: appScorer, - newStreamFn: newStream, - payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID), - peers: make(map[peer.ID]context.CancelFunc), - quarantineByNum: make(map[uint64]common.Hash), - inFlight: make(map[uint64]*atomic.Bool), - requests: make(chan rangeRequest), // blocking - peerRequests: make(chan peerRequest, 128), - results: make(chan syncResult, 128), - inFlightChecks: make(chan inFlightCheck, 128), - globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst), - resCtx: ctx, - resCancel: cancel, - receivePayload: rcv, + log: log, + cfg: cfg, + metrics: metrics, + appScorer: appScorer, + newStreamFn: newStream, + payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID), + peers: make(map[peer.ID]context.CancelFunc), + quarantineByNum: make(map[uint64]common.Hash), + inFlight: make(map[uint64]*atomic.Bool), + rangeRequests: make(chan rangeRequest), // blocking + activeRangeRequests: make(map[uint64]bool), + peerRequests: make(chan peerRequest, 128), + results: make(chan syncResult, 128), + inFlightChecks: make(chan inFlightCheck, 128), + globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst), + resCtx: ctx, + resCancel: cancel, + receivePayload: rcv, } // never errors with positive LRU cache size // TODO(CLI-3733): if we had an LRU based on on total payloads size, instead of payload count, @@ -320,7 +324,7 @@ func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockR } // synchronize requests with the main loop for state access select { - case s.requests <- rangeRequest{start: start.Number, end: end}: + case s.rangeRequests <- rangeRequest{start: start.Number, end: end}: return nil case <-ctx.Done(): return fmt.Errorf("too busy with P2P results/requests: %w", ctx.Err()) @@ -336,7 +340,7 @@ func (s *SyncClient) mainLoop() { defer s.wg.Done() for { select { - case req := <-s.requests: + case req := <-s.rangeRequests: ctx, cancel := context.WithTimeout(s.resCtx, maxRequestScheduling) s.onRangeRequest(ctx, req) cancel() @@ -382,6 +386,7 @@ func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) { s.trusted.Add(req.end.ParentHash, struct{}{}) log := s.log.New("target", req.start, "end", req.end) + log.Info("processing new L2 range request") // clean up the completed in-flight requests for k, v := range s.inFlight { @@ -390,6 +395,10 @@ func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) { } } + // Create shared reqId so associated peerRequests can all be cancelled by setting a single flag + randomReqId := rand.Uint64() + s.activeRangeRequests[randomReqId] = true + // Now try to fetch lower numbers than current end, to traverse back towards the updated start. for i := uint64(0); ; i++ { num := req.end.Number - 1 - i @@ -410,7 +419,7 @@ func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) { log.Debug("request still in-flight, not rescheduling sync request", "num", num) continue // request still in flight } - pr := peerRequest{num: num, complete: new(atomic.Bool)} + pr := peerRequest{num: num, complete: new(atomic.Bool), rangeReqId: randomReqId} log.Debug("Scheduling P2P block request", "num", num) // schedule number @@ -528,17 +537,36 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) { // once the peer is available, wait for a sync request. select { case pr := <-s.peerRequests: + if !s.activeRangeRequests[pr.rangeReqId] { + log.Debug("dropping cancelled p2p sync request", "num", pr.num) + delete(s.inFlight, pr.num) + continue + } + // We already established the peer is available w.r.t. rate-limiting, // and this is the only loop over this peer, so we can request now. start := time.Now() + + resultCode := byte(0) err := s.doRequest(ctx, id, pr.num) if err != nil { // mark as complete if there's an error: we are not sending any result and can complete immediately. pr.complete.Store(true) log.Warn("failed p2p sync request", "num", pr.num, "err", err) + + if re, ok := err.(requestResultErr); ok { + resultCode = re.ResultCode() + if re.ResultCode() == 1 { // indicates block not found error + log.Warn("cancelling p2p sync range request", "reqId", pr.rangeReqId) + delete(s.activeRangeRequests, pr.rangeReqId) + } + } else { + resultCode = 1 + } + s.appScorer.onResponseError(id) // If we hit an error, then count it as many requests. - // We'd like to avoid making more requests for a while, to back off. + // We'd like to avoid making more requests for a while, so back off. if err := rl.WaitN(ctx, clientErrRateCost); err != nil { return } @@ -546,16 +574,8 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) { log.Debug("completed p2p sync request", "num", pr.num) s.appScorer.onValidResponse(id) } - took := time.Since(start) - resultCode := byte(0) - if err != nil { - if re, ok := err.(requestResultErr); ok { - resultCode = re.ResultCode() - } else { - resultCode = 1 - } - } + took := time.Since(start) s.metrics.ClientPayloadByNumberEvent(pr.num, resultCode, took) case <-ctx.Done(): return