diff --git a/op-node/p2p/node.go b/op-node/p2p/node.go index b80801f7dbb5..8a0aee3afcf1 100644 --- a/op-node/p2p/node.go +++ b/op-node/p2p/node.go @@ -176,7 +176,8 @@ func (n *NodeP2P) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) if !n.AltSyncEnabled() { return fmt.Errorf("cannot request range %s - %s, req-resp sync is not enabled", start, end) } - return n.syncCl.RequestL2Range(ctx, start, end) + _, err := n.syncCl.RequestL2Range(ctx, start, end) + return err } func (n *NodeP2P) Host() host.Host { diff --git a/op-node/p2p/sync.go b/op-node/p2p/sync.go index 6bf0fc242c0f..e006aa4f4160 100644 --- a/op-node/p2p/sync.go +++ b/op-node/p2p/sync.go @@ -61,6 +61,13 @@ const ( clientErrRateCost = peerServerBlocksBurst ) +const ( + ResultCodeSuccess byte = 0 + ResultCodeNotFoundErr byte = 1 + ResultCodeInvalidErr byte = 2 + ResultCodeUnknownErr byte = 3 +) + func PayloadByNumberProtocolID(l2ChainID *big.Int) protocol.ID { return protocol.ID(fmt.Sprintf("/opstack/req/payload_by_number/%d/0", l2ChainID)) } @@ -87,6 +94,7 @@ type receivePayloadFn func(ctx context.Context, from peer.ID, payload *eth.Execu type rangeRequest struct { start uint64 end eth.L2BlockRef + id uint64 } type syncResult struct { @@ -95,17 +103,44 @@ type syncResult struct { } type peerRequest struct { - num uint64 - - complete *atomic.Bool + num uint64 + rangeReqId uint64 } type inFlightCheck struct { - num uint64 - + num uint64 result chan bool } +type requestIdMap struct { + requests map[uint64]bool + mu sync.Mutex +} + +func newRequestIdMap() *requestIdMap { + return &requestIdMap{ + requests: make(map[uint64]bool), + } +} + +func (r *requestIdMap) set(key uint64, value bool) { + r.mu.Lock() + r.requests[key] = value + r.mu.Unlock() +} + +func (r *requestIdMap) get(key uint64) bool { + r.mu.Lock() + defer r.mu.Unlock() + return r.requests[key] +} + +func (r *requestIdMap) delete(key uint64) { + r.mu.Lock() + delete(r.requests, key) + r.mu.Unlock() +} + type SyncClientMetrics interface { ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) PayloadsQuarantineSize(n int) @@ -209,12 +244,14 @@ type SyncClient struct { quarantineByNum map[uint64]common.Hash // inFlight requests are not repeated - inFlight map[uint64]*atomic.Bool - - requests chan rangeRequest - peerRequests chan peerRequest + inFlight *requestIdMap inFlightChecks chan inFlightCheck + rangeRequests chan rangeRequest + activeRangeRequests *requestIdMap + rangeReqId uint64 + peerRequests chan peerRequest + results chan syncResult receivePayload receivePayloadFn @@ -238,24 +275,26 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc ctx, cancel := context.WithCancel(context.Background()) c := &SyncClient{ - log: log, - cfg: cfg, - metrics: metrics, - appScorer: appScorer, - newStreamFn: newStream, - payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID), - peers: make(map[peer.ID]context.CancelFunc), - quarantineByNum: make(map[uint64]common.Hash), - inFlight: make(map[uint64]*atomic.Bool), - requests: make(chan rangeRequest), // blocking - peerRequests: make(chan peerRequest, 128), - results: make(chan syncResult, 128), - inFlightChecks: make(chan inFlightCheck, 128), - globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst), - resCtx: ctx, - resCancel: cancel, - receivePayload: rcv, + log: log, + cfg: cfg, + metrics: metrics, + appScorer: appScorer, + newStreamFn: newStream, + payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID), + peers: make(map[peer.ID]context.CancelFunc), + quarantineByNum: make(map[uint64]common.Hash), + rangeRequests: make(chan rangeRequest), // blocking + activeRangeRequests: newRequestIdMap(), + peerRequests: make(chan peerRequest, 128), + results: make(chan syncResult, 128), + inFlight: newRequestIdMap(), + inFlightChecks: make(chan inFlightCheck, 128), + globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst), + resCtx: ctx, + resCancel: cancel, + receivePayload: rcv, } + // never errors with positive LRU cache size // TODO(CLI-3733): if we had an LRU based on on total payloads size, instead of payload count, // we can safely buffer more data in the happy case. @@ -313,17 +352,23 @@ func (s *SyncClient) Close() error { return nil } -func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error { +func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) (uint64, error) { if end == (eth.L2BlockRef{}) { s.log.Debug("P2P sync client received range signal, but cannot sync open-ended chain: need sync target to verify blocks through parent-hashes", "start", start) - return nil + return 0, nil } + // Create shared rangeReqId so associated peerRequests can all be cancelled by setting a single flag + rangeReqId := atomic.AddUint64(&s.rangeReqId, 1) + // need to flag request as active before adding request to s.rangeRequests to avoid race + s.activeRangeRequests.set(rangeReqId, true) + // synchronize requests with the main loop for state access select { - case s.requests <- rangeRequest{start: start.Number, end: end}: - return nil + case s.rangeRequests <- rangeRequest{start: start.Number, end: end, id: rangeReqId}: + return rangeReqId, nil case <-ctx.Done(): - return fmt.Errorf("too busy with P2P results/requests: %w", ctx.Err()) + s.activeRangeRequests.delete(rangeReqId) + return rangeReqId, fmt.Errorf("too busy with P2P results/requests: %w", ctx.Err()) } } @@ -336,7 +381,7 @@ func (s *SyncClient) mainLoop() { defer s.wg.Done() for { select { - case req := <-s.requests: + case req := <-s.rangeRequests: ctx, cancel := context.WithTimeout(s.resCtx, maxRequestScheduling) s.onRangeRequest(ctx, req) cancel() @@ -346,12 +391,7 @@ func (s *SyncClient) mainLoop() { cancel() case check := <-s.inFlightChecks: s.log.Info("Checking in flight", "num", check.num) - complete, ok := s.inFlight[check.num] - if !ok { - check.result <- false - } else { - check.result <- !complete.Load() - } + check.result <- s.inFlight.get(check.num) case <-s.resCtx.Done(): s.log.Info("stopped P2P req-resp L2 block sync client") return @@ -377,19 +417,13 @@ func (s *SyncClient) isInFlight(ctx context.Context, num uint64) (bool, error) { // onRangeRequest is exclusively called by the main loop, and has thus direct access to the request bookkeeping state. // This function transforms requested block ranges into work for each peer. func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) { + log := s.log.New("target", req.start, "end", req.end) + log.Info("processing L2 range request", "rangeReqId", req.id) + // add req head to trusted set of blocks s.trusted.Add(req.end.Hash, struct{}{}) s.trusted.Add(req.end.ParentHash, struct{}{}) - log := s.log.New("target", req.start, "end", req.end) - - // clean up the completed in-flight requests - for k, v := range s.inFlight { - if v.Load() { - delete(s.inFlight, k) - } - } - // Now try to fetch lower numbers than current end, to traverse back towards the updated start. for i := uint64(0); ; i++ { num := req.end.Number - 1 - i @@ -406,17 +440,17 @@ func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) { continue } - if _, ok := s.inFlight[num]; ok { + if s.inFlight.get(num) { log.Debug("request still in-flight, not rescheduling sync request", "num", num) continue // request still in flight } - pr := peerRequest{num: num, complete: new(atomic.Bool)} + pr := peerRequest{num: num, rangeReqId: req.id} - log.Debug("Scheduling P2P block request", "num", num) + log.Debug("Scheduling P2P block request", "num", num, "rangeReqId", req.id) // schedule number select { case s.peerRequests <- pr: - s.inFlight[num] = pr.complete + s.inFlight.set(num, true) case <-ctx.Done(): log.Info("did not schedule full P2P sync range", "current", num, "err", ctx.Err()) return @@ -487,7 +521,7 @@ func (s *SyncClient) onResult(ctx context.Context, res syncResult) { payload := res.payload.ExecutionPayload s.log.Debug("processing p2p sync result", "payload", payload.ID(), "peer", res.peer) // Clean up the in-flight request, we have a result now. - delete(s.inFlight, uint64(payload.BlockNumber)) + s.inFlight.delete(uint64(payload.BlockNumber)) // Always put it in quarantine first. If promotion fails because the receiver is too busy, this functions as cache. s.quarantine.Add(payload.BlockHash, res) s.quarantineByNum[uint64(payload.BlockNumber)] = payload.BlockHash @@ -528,17 +562,39 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) { // once the peer is available, wait for a sync request. select { case pr := <-s.peerRequests: + if !s.activeRangeRequests.get(pr.rangeReqId) { + log.Debug("dropping cancelled p2p sync request", "num", pr.num) + s.inFlight.delete(pr.num) + continue + } + // We already established the peer is available w.r.t. rate-limiting, // and this is the only loop over this peer, so we can request now. start := time.Now() + + resultCode := ResultCodeSuccess err := s.doRequest(ctx, id, pr.num) if err != nil { - // mark as complete if there's an error: we are not sending any result and can complete immediately. - pr.complete.Store(true) + s.inFlight.delete(pr.num) log.Warn("failed p2p sync request", "num", pr.num, "err", err) - s.appScorer.onResponseError(id) + resultCode = ResultCodeNotFoundErr + sendResponseError := true + + if re, ok := err.(requestResultErr); ok { + resultCode = re.ResultCode() + if resultCode == ResultCodeNotFoundErr { + log.Warn("cancelling p2p sync range request", "rangeReqId", pr.rangeReqId) + s.activeRangeRequests.delete(pr.rangeReqId) + sendResponseError = false // don't penalize peer for this error + } + } + + if sendResponseError { + s.appScorer.onResponseError(id) + } + // If we hit an error, then count it as many requests. - // We'd like to avoid making more requests for a while, to back off. + // We'd like to avoid making more requests for a while, so back off. if err := rl.WaitN(ctx, clientErrRateCost); err != nil { return } @@ -546,16 +602,8 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) { log.Debug("completed p2p sync request", "num", pr.num) s.appScorer.onValidResponse(id) } - took := time.Since(start) - resultCode := byte(0) - if err != nil { - if re, ok := err.(requestResultErr); ok { - resultCode = re.ResultCode() - } else { - resultCode = 1 - } - } + took := time.Since(start) s.metrics.ClientPayloadByNumberEvent(pr.num, resultCode, took) case <-ctx.Done(): return @@ -740,22 +788,22 @@ func (srv *ReqRespServer) HandleSyncRequest(ctx context.Context, log log.Logger, req, err := srv.handleSyncRequest(ctx, stream) cancel() - resultCode := byte(0) + resultCode := ResultCodeSuccess if err != nil { log.Warn("failed to serve p2p sync request", "req", req, "err", err) if errors.Is(err, ethereum.NotFound) { - resultCode = 1 + resultCode = ResultCodeNotFoundErr } else if errors.Is(err, invalidRequestErr) { - resultCode = 2 + resultCode = ResultCodeInvalidErr } else { - resultCode = 3 + resultCode = ResultCodeUnknownErr } // try to write error code, so the other peer can understand the reason for failure. _, _ = stream.Write([]byte{resultCode}) } else { log.Debug("successfully served sync response", "req", req) } - srv.metrics.ServerPayloadByNumberEvent(req, 0, time.Since(start)) + srv.metrics.ServerPayloadByNumberEvent(req, resultCode, time.Since(start)) } var invalidRequestErr = errors.New("invalid request") diff --git a/op-node/p2p/sync_test.go b/op-node/p2p/sync_test.go index 0c43778ce8f0..cd44fb521473 100644 --- a/op-node/p2p/sync_test.go +++ b/op-node/p2p/sync_test.go @@ -169,7 +169,8 @@ func TestSinglePeerSync(t *testing.T) { defer cl.Close() // request to start syncing between 10 and 20 - require.NoError(t, cl.RequestL2Range(ctx, payloads.getBlockRef(10), payloads.getBlockRef(20))) + _, err = cl.RequestL2Range(ctx, payloads.getBlockRef(10), payloads.getBlockRef(20)) + require.NoError(t, err) // and wait for the sync results to come in (in reverse order) for i := uint64(19); i > 10; i-- { @@ -255,7 +256,8 @@ func TestMultiPeerSync(t *testing.T) { defer clC.Close() // request to start syncing between 10 and 90 - require.NoError(t, clA.RequestL2Range(ctx, payloads.getBlockRef(10), payloads.getBlockRef(90))) + _, err = clA.RequestL2Range(ctx, payloads.getBlockRef(10), payloads.getBlockRef(90)) + require.NoError(t, err) // With such large range to request we are going to hit the rate-limits of B and C, // but that means we'll balance the work between the peers. @@ -270,13 +272,18 @@ func TestMultiPeerSync(t *testing.T) { // now see if B can sync a range, and fill the gap with a re-request bl25, _ := payloads.getPayload(25) // temporarily remove it from the available payloads. This will create a gap payloads.deletePayload(25) - require.NoError(t, clB.RequestL2Range(ctx, payloads.getBlockRef(20), payloads.getBlockRef(30))) + rangeReqId, err := clB.RequestL2Range(ctx, payloads.getBlockRef(20), payloads.getBlockRef(30)) + + require.NoError(t, err) + require.True(t, clB.activeRangeRequests.get(rangeReqId), "expecting range request to be active") + for i := uint64(29); i > 25; i-- { p := <-recvB exp, ok := payloads.getPayload(uint64(p.ExecutionPayload.BlockNumber)) require.True(t, ok, "expecting known payload") require.Equal(t, exp.ExecutionPayload.BlockHash, p.ExecutionPayload.BlockHash, "expecting the correct payload") } + // Wait for the request for block 25 to be made ctx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second) defer cancelFunc() @@ -291,12 +298,12 @@ func TestMultiPeerSync(t *testing.T) { t.Fatal("Did not request block 25 in a reasonable time") } } + // the request for 25 should fail. See: // server: WARN peer requested unknown block by number num=25 // client: WARN failed p2p sync request num=25 err="peer failed to serve request with code 1" require.Zero(t, len(recvB), "there is a gap, should not see other payloads yet") - // Add back the block - payloads.addPayload(bl25) + // race-condition fix: the request for 25 is expected to error, but is marked as complete in the peer-loop. // But the re-request checks the status in the main loop, and it may thus look like it's still in-flight, // and thus not run the new request. @@ -306,13 +313,18 @@ func TestMultiPeerSync(t *testing.T) { for { isInFlight, err := clB.isInFlight(ctx, 25) require.NoError(t, err) + time.Sleep(time.Second) if !isInFlight { break } - time.Sleep(time.Second) } + require.False(t, clB.activeRangeRequests.get(rangeReqId), "expecting range request to be cancelled") + + // Add back the block + payloads.addPayload(bl25) // And request a range again, 25 is there now, and 21-24 should follow quickly (some may already have been fetched and wait in quarantine) - require.NoError(t, clB.RequestL2Range(ctx, payloads.getBlockRef(20), payloads.getBlockRef(26))) + _, err = clB.RequestL2Range(ctx, payloads.getBlockRef(20), payloads.getBlockRef(26)) + require.NoError(t, err) for i := uint64(25); i > 20; i-- { p := <-recvB exp, ok := payloads.getPayload(uint64(p.ExecutionPayload.BlockNumber))