Skip to content

Commit

Permalink
Add rangeReqId so we can cancel all associated block req via single flag
Browse files Browse the repository at this point in the history
  • Loading branch information
bitwiseguy committed Apr 8, 2024
1 parent a3cc8f2 commit 201d280
Showing 1 changed file with 54 additions and 34 deletions.
88 changes: 54 additions & 34 deletions op-node/p2p/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"math/big"
"math/rand"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -97,7 +98,8 @@ type syncResult struct {
type peerRequest struct {
num uint64

complete *atomic.Bool
complete *atomic.Bool
rangeReqId uint64
}

type inFlightCheck struct {
Expand Down Expand Up @@ -211,9 +213,10 @@ type SyncClient struct {
// inFlight requests are not repeated
inFlight map[uint64]*atomic.Bool

requests chan rangeRequest
peerRequests chan peerRequest
inFlightChecks chan inFlightCheck
rangeRequests chan rangeRequest
activeRangeRequests map[uint64]bool
peerRequests chan peerRequest
inFlightChecks chan inFlightCheck

results chan syncResult

Expand All @@ -238,23 +241,24 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc
ctx, cancel := context.WithCancel(context.Background())

c := &SyncClient{
log: log,
cfg: cfg,
metrics: metrics,
appScorer: appScorer,
newStreamFn: newStream,
payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID),
peers: make(map[peer.ID]context.CancelFunc),
quarantineByNum: make(map[uint64]common.Hash),
inFlight: make(map[uint64]*atomic.Bool),
requests: make(chan rangeRequest), // blocking
peerRequests: make(chan peerRequest, 128),
results: make(chan syncResult, 128),
inFlightChecks: make(chan inFlightCheck, 128),
globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst),
resCtx: ctx,
resCancel: cancel,
receivePayload: rcv,
log: log,
cfg: cfg,
metrics: metrics,
appScorer: appScorer,
newStreamFn: newStream,
payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID),
peers: make(map[peer.ID]context.CancelFunc),
quarantineByNum: make(map[uint64]common.Hash),
inFlight: make(map[uint64]*atomic.Bool),
rangeRequests: make(chan rangeRequest), // blocking
activeRangeRequests: make(map[uint64]bool),
peerRequests: make(chan peerRequest, 128),
results: make(chan syncResult, 128),
inFlightChecks: make(chan inFlightCheck, 128),
globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst),
resCtx: ctx,
resCancel: cancel,
receivePayload: rcv,
}
// never errors with positive LRU cache size
// TODO(CLI-3733): if we had an LRU based on on total payloads size, instead of payload count,
Expand Down Expand Up @@ -320,7 +324,7 @@ func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockR
}
// synchronize requests with the main loop for state access
select {
case s.requests <- rangeRequest{start: start.Number, end: end}:
case s.rangeRequests <- rangeRequest{start: start.Number, end: end}:
return nil
case <-ctx.Done():
return fmt.Errorf("too busy with P2P results/requests: %w", ctx.Err())
Expand All @@ -336,7 +340,7 @@ func (s *SyncClient) mainLoop() {
defer s.wg.Done()
for {
select {
case req := <-s.requests:
case req := <-s.rangeRequests:
ctx, cancel := context.WithTimeout(s.resCtx, maxRequestScheduling)
s.onRangeRequest(ctx, req)
cancel()
Expand Down Expand Up @@ -382,6 +386,7 @@ func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
s.trusted.Add(req.end.ParentHash, struct{}{})

log := s.log.New("target", req.start, "end", req.end)
log.Info("processing new L2 range request")

// clean up the completed in-flight requests
for k, v := range s.inFlight {
Expand All @@ -390,6 +395,10 @@ func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
}
}

// Create shared reqId so associated peerRequests can all be cancelled by setting a single flag
randomReqId := rand.Uint64()
s.activeRangeRequests[randomReqId] = true

// Now try to fetch lower numbers than current end, to traverse back towards the updated start.
for i := uint64(0); ; i++ {
num := req.end.Number - 1 - i
Expand All @@ -410,7 +419,7 @@ func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) {
log.Debug("request still in-flight, not rescheduling sync request", "num", num)
continue // request still in flight
}
pr := peerRequest{num: num, complete: new(atomic.Bool)}
pr := peerRequest{num: num, complete: new(atomic.Bool), rangeReqId: randomReqId}

log.Debug("Scheduling P2P block request", "num", num)
// schedule number
Expand Down Expand Up @@ -528,34 +537,45 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
// once the peer is available, wait for a sync request.
select {
case pr := <-s.peerRequests:
if !s.activeRangeRequests[pr.rangeReqId] {
log.Debug("dropping cancelled p2p sync request", "num", pr.num)
delete(s.inFlight, pr.num)
continue
}

// We already established the peer is available w.r.t. rate-limiting,
// and this is the only loop over this peer, so we can request now.
start := time.Now()

resultCode := byte(0)
err := s.doRequest(ctx, id, pr.num)
if err != nil {
// mark as complete if there's an error: we are not sending any result and can complete immediately.
pr.complete.Store(true)
log.Warn("failed p2p sync request", "num", pr.num, "err", err)

if re, ok := err.(requestResultErr); ok {
resultCode = re.ResultCode()
if re.ResultCode() == 1 { // indicates block not found error
log.Warn("cancelling p2p sync range request", "reqId", pr.rangeReqId)
delete(s.activeRangeRequests, pr.rangeReqId)
}
} else {
resultCode = 1
}

s.appScorer.onResponseError(id)
// If we hit an error, then count it as many requests.
// We'd like to avoid making more requests for a while, to back off.
// We'd like to avoid making more requests for a while, so back off.
if err := rl.WaitN(ctx, clientErrRateCost); err != nil {
return
}
} else {
log.Debug("completed p2p sync request", "num", pr.num)
s.appScorer.onValidResponse(id)
}
took := time.Since(start)

resultCode := byte(0)
if err != nil {
if re, ok := err.(requestResultErr); ok {
resultCode = re.ResultCode()
} else {
resultCode = 1
}
}
took := time.Since(start)
s.metrics.ClientPayloadByNumberEvent(pr.num, resultCode, took)
case <-ctx.Done():
return
Expand Down

0 comments on commit 201d280

Please sign in to comment.