Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-node: prevent spamming of reqs for blocks triggered by checkForGapInUnsafeQueue #10063

Merged
merged 12 commits into from
Apr 23, 2024
Merged
3 changes: 2 additions & 1 deletion op-node/p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ func (n *NodeP2P) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef)
if !n.AltSyncEnabled() {
return fmt.Errorf("cannot request range %s - %s, req-resp sync is not enabled", start, end)
}
return n.syncCl.RequestL2Range(ctx, start, end)
_, err := n.syncCl.RequestL2Range(ctx, start, end)
return err
}

func (n *NodeP2P) Host() host.Host {
Expand Down
188 changes: 118 additions & 70 deletions op-node/p2p/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ const (
clientErrRateCost = peerServerBlocksBurst
)

const (
ResultCodeSuccess byte = 0
ResultCodeNotFoundErr byte = 1
ResultCodeInvalidErr byte = 2
ResultCodeUnknownErr byte = 3
)

func PayloadByNumberProtocolID(l2ChainID *big.Int) protocol.ID {
return protocol.ID(fmt.Sprintf("/opstack/req/payload_by_number/%d/0", l2ChainID))
}
Expand All @@ -87,6 +94,7 @@ type receivePayloadFn func(ctx context.Context, from peer.ID, payload *eth.Execu
type rangeRequest struct {
start uint64
end eth.L2BlockRef
id uint64
}

type syncResult struct {
Expand All @@ -95,17 +103,44 @@ type syncResult struct {
}

type peerRequest struct {
num uint64

complete *atomic.Bool
num uint64
rangeReqId uint64
}

type inFlightCheck struct {
num uint64

num uint64
result chan bool
}

type requestIdMap struct {
requests map[uint64]bool
mu sync.Mutex
}

func newRequestIdMap() *requestIdMap {
return &requestIdMap{
requests: make(map[uint64]bool),
}
}

func (r *requestIdMap) set(key uint64, value bool) {
r.mu.Lock()
r.requests[key] = value
r.mu.Unlock()
}

func (r *requestIdMap) get(key uint64) bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.requests[key]
}

func (r *requestIdMap) delete(key uint64) {
r.mu.Lock()
delete(r.requests, key)
r.mu.Unlock()
}

type SyncClientMetrics interface {
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
PayloadsQuarantineSize(n int)
Expand Down Expand Up @@ -209,12 +244,14 @@ type SyncClient struct {
quarantineByNum map[uint64]common.Hash

// inFlight requests are not repeated
inFlight map[uint64]*atomic.Bool

requests chan rangeRequest
peerRequests chan peerRequest
inFlight *requestIdMap
inFlightChecks chan inFlightCheck

rangeRequests chan rangeRequest
activeRangeRequests *requestIdMap
rangeReqId uint64
peerRequests chan peerRequest

results chan syncResult

receivePayload receivePayloadFn
Expand All @@ -238,24 +275,26 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc
ctx, cancel := context.WithCancel(context.Background())

c := &SyncClient{
log: log,
cfg: cfg,
metrics: metrics,
appScorer: appScorer,
newStreamFn: newStream,
payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID),
peers: make(map[peer.ID]context.CancelFunc),
quarantineByNum: make(map[uint64]common.Hash),
inFlight: make(map[uint64]*atomic.Bool),
requests: make(chan rangeRequest), // blocking
peerRequests: make(chan peerRequest, 128),
results: make(chan syncResult, 128),
inFlightChecks: make(chan inFlightCheck, 128),
globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst),
resCtx: ctx,
resCancel: cancel,
receivePayload: rcv,
log: log,
cfg: cfg,
metrics: metrics,
appScorer: appScorer,
newStreamFn: newStream,
payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID),
peers: make(map[peer.ID]context.CancelFunc),
quarantineByNum: make(map[uint64]common.Hash),
rangeRequests: make(chan rangeRequest), // blocking
activeRangeRequests: newRequestIdMap(),
peerRequests: make(chan peerRequest, 128),
results: make(chan syncResult, 128),
inFlight: newRequestIdMap(),
inFlightChecks: make(chan inFlightCheck, 128),
globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst),
resCtx: ctx,
resCancel: cancel,
receivePayload: rcv,
}

// never errors with positive LRU cache size
// TODO(CLI-3733): if we had an LRU based on on total payloads size, instead of payload count,
// we can safely buffer more data in the happy case.
Expand Down Expand Up @@ -313,17 +352,23 @@ func (s *SyncClient) Close() error {
return nil
}

func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) (uint64, error) {
bitwiseguy marked this conversation as resolved.
Show resolved Hide resolved
if end == (eth.L2BlockRef{}) {
s.log.Debug("P2P sync client received range signal, but cannot sync open-ended chain: need sync target to verify blocks through parent-hashes", "start", start)
return nil
return 0, nil
}
// Create shared rangeReqId so associated peerRequests can all be cancelled by setting a single flag
rangeReqId := atomic.AddUint64(&s.rangeReqId, 1)
// need to flag request as active before adding request to s.rangeRequests to avoid race
s.activeRangeRequests.set(rangeReqId, true)

// synchronize requests with the main loop for state access
select {
case s.requests <- rangeRequest{start: start.Number, end: end}:
return nil
case s.rangeRequests <- rangeRequest{start: start.Number, end: end, id: rangeReqId}:
return rangeReqId, nil
case <-ctx.Done():
return fmt.Errorf("too busy with P2P results/requests: %w", ctx.Err())
s.activeRangeRequests.delete(rangeReqId)
return rangeReqId, fmt.Errorf("too busy with P2P results/requests: %w", ctx.Err())
}
}

Expand All @@ -336,7 +381,7 @@ func (s *SyncClient) mainLoop() {
defer s.wg.Done()
for {
select {
case req := <-s.requests:
case req := <-s.rangeRequests:
ctx, cancel := context.WithTimeout(s.resCtx, maxRequestScheduling)
s.onRangeRequest(ctx, req)
cancel()
Expand All @@ -346,12 +391,7 @@ func (s *SyncClient) mainLoop() {
cancel()
case check := <-s.inFlightChecks:
s.log.Info("Checking in flight", "num", check.num)
complete, ok := s.inFlight[check.num]
if !ok {
check.result <- false
} else {
check.result <- !complete.Load()
}
check.result <- s.inFlight.get(check.num)
case <-s.resCtx.Done():
s.log.Info("stopped P2P req-resp L2 block sync client")
return
Expand All @@ -377,19 +417,13 @@ func (s *SyncClient) isInFlight(ctx context.Context, num uint64) (bool, error) {
// onRangeRequest is exclusively called by the main loop, and has thus direct access to the request bookkeeping state.
// This function transforms requested block ranges into work for each peer.
func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
log := s.log.New("target", req.start, "end", req.end)
log.Info("processing L2 range request", "rangeReqId", req.id)

// add req head to trusted set of blocks
s.trusted.Add(req.end.Hash, struct{}{})
s.trusted.Add(req.end.ParentHash, struct{}{})

log := s.log.New("target", req.start, "end", req.end)

// clean up the completed in-flight requests
bitwiseguy marked this conversation as resolved.
Show resolved Hide resolved
for k, v := range s.inFlight {
if v.Load() {
delete(s.inFlight, k)
}
}

// Now try to fetch lower numbers than current end, to traverse back towards the updated start.
for i := uint64(0); ; i++ {
num := req.end.Number - 1 - i
Expand All @@ -406,17 +440,17 @@ func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
continue
}

if _, ok := s.inFlight[num]; ok {
if s.inFlight.get(num) {
log.Debug("request still in-flight, not rescheduling sync request", "num", num)
continue // request still in flight
}
pr := peerRequest{num: num, complete: new(atomic.Bool)}
pr := peerRequest{num: num, rangeReqId: req.id}

log.Debug("Scheduling P2P block request", "num", num)
log.Debug("Scheduling P2P block request", "num", num, "rangeReqId", req.id)
// schedule number
select {
case s.peerRequests <- pr:
s.inFlight[num] = pr.complete
s.inFlight.set(num, true)
case <-ctx.Done():
log.Info("did not schedule full P2P sync range", "current", num, "err", ctx.Err())
return
Expand Down Expand Up @@ -487,7 +521,7 @@ func (s *SyncClient) onResult(ctx context.Context, res syncResult) {
payload := res.payload.ExecutionPayload
s.log.Debug("processing p2p sync result", "payload", payload.ID(), "peer", res.peer)
// Clean up the in-flight request, we have a result now.
delete(s.inFlight, uint64(payload.BlockNumber))
s.inFlight.delete(uint64(payload.BlockNumber))
// Always put it in quarantine first. If promotion fails because the receiver is too busy, this functions as cache.
s.quarantine.Add(payload.BlockHash, res)
s.quarantineByNum[uint64(payload.BlockNumber)] = payload.BlockHash
Expand Down Expand Up @@ -528,34 +562,48 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
// once the peer is available, wait for a sync request.
select {
case pr := <-s.peerRequests:
if !s.activeRangeRequests.get(pr.rangeReqId) {
log.Debug("dropping cancelled p2p sync request", "num", pr.num)
s.inFlight.delete(pr.num)
continue
}

// We already established the peer is available w.r.t. rate-limiting,
// and this is the only loop over this peer, so we can request now.
start := time.Now()

resultCode := ResultCodeSuccess
err := s.doRequest(ctx, id, pr.num)
if err != nil {
// mark as complete if there's an error: we are not sending any result and can complete immediately.
pr.complete.Store(true)
s.inFlight.delete(pr.num)
log.Warn("failed p2p sync request", "num", pr.num, "err", err)
s.appScorer.onResponseError(id)
resultCode = ResultCodeNotFoundErr
sendResponseError := true

if re, ok := err.(requestResultErr); ok {
resultCode = re.ResultCode()
if resultCode == ResultCodeNotFoundErr {
log.Warn("cancelling p2p sync range request", "rangeReqId", pr.rangeReqId)
s.activeRangeRequests.delete(pr.rangeReqId)
sendResponseError = false // don't penalize peer for this error
}
}

if sendResponseError {
s.appScorer.onResponseError(id)
}

// If we hit an error, then count it as many requests.
// We'd like to avoid making more requests for a while, to back off.
// We'd like to avoid making more requests for a while, so back off.
if err := rl.WaitN(ctx, clientErrRateCost); err != nil {
return
}
} else {
log.Debug("completed p2p sync request", "num", pr.num)
s.appScorer.onValidResponse(id)
}
took := time.Since(start)

resultCode := byte(0)
if err != nil {
if re, ok := err.(requestResultErr); ok {
resultCode = re.ResultCode()
} else {
resultCode = 1
}
}
took := time.Since(start)
s.metrics.ClientPayloadByNumberEvent(pr.num, resultCode, took)
case <-ctx.Done():
return
Expand Down Expand Up @@ -740,22 +788,22 @@ func (srv *ReqRespServer) HandleSyncRequest(ctx context.Context, log log.Logger,
req, err := srv.handleSyncRequest(ctx, stream)
cancel()

resultCode := byte(0)
resultCode := ResultCodeSuccess
if err != nil {
log.Warn("failed to serve p2p sync request", "req", req, "err", err)
if errors.Is(err, ethereum.NotFound) {
resultCode = 1
resultCode = ResultCodeNotFoundErr
} else if errors.Is(err, invalidRequestErr) {
resultCode = 2
resultCode = ResultCodeInvalidErr
} else {
resultCode = 3
resultCode = ResultCodeUnknownErr
}
// try to write error code, so the other peer can understand the reason for failure.
_, _ = stream.Write([]byte{resultCode})
} else {
log.Debug("successfully served sync response", "req", req)
}
srv.metrics.ServerPayloadByNumberEvent(req, 0, time.Since(start))
srv.metrics.ServerPayloadByNumberEvent(req, resultCode, time.Since(start))
}

var invalidRequestErr = errors.New("invalid request")
Expand Down
Loading