Skip to content

Commit

Permalink
Use same struct for activeRangeRequests and inFlight
Browse files Browse the repository at this point in the history
  • Loading branch information
bitwiseguy committed Apr 17, 2024
1 parent 32f9860 commit 74bd258
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 39 deletions.
60 changes: 27 additions & 33 deletions op-node/p2p/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,33 +112,33 @@ type inFlightCheck struct {
result chan bool
}

type inFlight struct {
type requestIdMap struct {
requests map[uint64]bool
mu sync.Mutex
}

func newInFlight() *inFlight {
return &inFlight{
func newRequestIdMap() *requestIdMap {
return &requestIdMap{
requests: make(map[uint64]bool),
}
}

func (s *inFlight) set(key uint64, value bool) {
s.mu.Lock()
s.requests[key] = value
s.mu.Unlock()
func (r *requestIdMap) set(key uint64, value bool) {
r.mu.Lock()
r.requests[key] = value
r.mu.Unlock()
}

func (s *inFlight) get(key uint64) bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.requests[key]
func (r *requestIdMap) get(key uint64) bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.requests[key]
}

func (s *inFlight) delete(key uint64) {
s.mu.Lock()
delete(s.requests, key)
s.mu.Unlock()
func (r *requestIdMap) delete(key uint64) {
r.mu.Lock()
delete(r.requests, key)
r.mu.Unlock()
}

type SyncClientMetrics interface {
Expand Down Expand Up @@ -244,14 +244,13 @@ type SyncClient struct {
quarantineByNum map[uint64]common.Hash

// inFlight requests are not repeated
inFlight *inFlight
inFlight *requestIdMap
inFlightChecks chan inFlightCheck

rangeRequests chan rangeRequest
activeRangeRequests map[uint64]bool
activeRangeRequestsMu sync.Mutex
rangeReqId uint64
peerRequests chan peerRequest
rangeRequests chan rangeRequest
activeRangeRequests *requestIdMap
rangeReqId uint64
peerRequests chan peerRequest

results chan syncResult

Expand Down Expand Up @@ -285,10 +284,10 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc
peers: make(map[peer.ID]context.CancelFunc),
quarantineByNum: make(map[uint64]common.Hash),
rangeRequests: make(chan rangeRequest), // blocking
activeRangeRequests: make(map[uint64]bool),
activeRangeRequests: newRequestIdMap(),
peerRequests: make(chan peerRequest, 128),
results: make(chan syncResult, 128),
inFlight: newInFlight(),
inFlight: newRequestIdMap(),
inFlightChecks: make(chan inFlightCheck, 128),
globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst),
resCtx: ctx,
Expand Down Expand Up @@ -360,15 +359,15 @@ func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockR
}
// Create shared rangeReqId so associated peerRequests can all be cancelled by setting a single flag
rangeReqId := atomic.AddUint64(&s.rangeReqId, 1)
s.activeRangeRequestsMu.Lock()
defer s.activeRangeRequestsMu.Unlock()
// need to flag request as active before adding request to s.rangeRequests to avoid race
s.activeRangeRequests.set(rangeReqId, true)

// synchronize requests with the main loop for state access
select {
case s.rangeRequests <- rangeRequest{start: start.Number, end: end, id: rangeReqId}:
s.activeRangeRequests[rangeReqId] = true
return rangeReqId, nil
case <-ctx.Done():
s.activeRangeRequests.delete(rangeReqId)
return rangeReqId, fmt.Errorf("too busy with P2P results/requests: %w", ctx.Err())
}
}
Expand Down Expand Up @@ -563,10 +562,7 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
// once the peer is available, wait for a sync request.
select {
case pr := <-s.peerRequests:
s.activeRangeRequestsMu.Lock()
isActive := s.activeRangeRequests[pr.rangeReqId]
s.activeRangeRequestsMu.Unlock()
if !isActive {
if !s.activeRangeRequests.get(pr.rangeReqId) {
log.Debug("dropping cancelled p2p sync request", "num", pr.num)
s.inFlight.delete(pr.num)
continue
Expand All @@ -588,9 +584,7 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
resultCode = re.ResultCode()
if resultCode == ResultCodeNotFoundErr {
log.Warn("cancelling p2p sync range request", "rangeReqId", pr.rangeReqId)
s.activeRangeRequestsMu.Lock()
delete(s.activeRangeRequests, pr.rangeReqId)
s.activeRangeRequestsMu.Unlock()
s.activeRangeRequests.delete(pr.rangeReqId)
sendResponseError = false // don't penalize peer for this error
}
}
Expand Down
8 changes: 2 additions & 6 deletions op-node/p2p/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,8 @@ func TestMultiPeerSync(t *testing.T) {
payloads.deletePayload(25)
rangeReqId, err := clB.RequestL2Range(ctx, payloads.getBlockRef(20), payloads.getBlockRef(30))

clB.activeRangeRequestsMu.Lock()
require.NoError(t, err)
require.True(t, clB.activeRangeRequests[rangeReqId], "expecting range request to be active")
clB.activeRangeRequestsMu.Unlock()
require.True(t, clB.activeRangeRequests.get(rangeReqId), "expecting range request to be active")

for i := uint64(29); i > 25; i-- {
p := <-recvB
Expand Down Expand Up @@ -320,9 +318,7 @@ func TestMultiPeerSync(t *testing.T) {
break
}
}
clB.activeRangeRequestsMu.Lock()
require.False(t, clB.activeRangeRequests[rangeReqId], "expecting range request to be cancelled")
clB.activeRangeRequestsMu.Unlock()
require.False(t, clB.activeRangeRequests.get(rangeReqId), "expecting range request to be cancelled")

// Add back the block
payloads.addPayload(bl25)
Expand Down

0 comments on commit 74bd258

Please sign in to comment.