From 74bd2584ec5bf6a372f82097f0eb477f7cb7c4ff Mon Sep 17 00:00:00 2001 From: Samuel Stokes Date: Wed, 17 Apr 2024 09:59:10 -0400 Subject: [PATCH] Use same struct for activeRangeRequests and inFlight --- op-node/p2p/sync.go | 60 ++++++++++++++++++---------------------- op-node/p2p/sync_test.go | 8 ++---- 2 files changed, 29 insertions(+), 39 deletions(-) diff --git a/op-node/p2p/sync.go b/op-node/p2p/sync.go index 1c5dd7c8247a..e006aa4f4160 100644 --- a/op-node/p2p/sync.go +++ b/op-node/p2p/sync.go @@ -112,33 +112,33 @@ type inFlightCheck struct { result chan bool } -type inFlight struct { +type requestIdMap struct { requests map[uint64]bool mu sync.Mutex } -func newInFlight() *inFlight { - return &inFlight{ +func newRequestIdMap() *requestIdMap { + return &requestIdMap{ requests: make(map[uint64]bool), } } -func (s *inFlight) set(key uint64, value bool) { - s.mu.Lock() - s.requests[key] = value - s.mu.Unlock() +func (r *requestIdMap) set(key uint64, value bool) { + r.mu.Lock() + r.requests[key] = value + r.mu.Unlock() } -func (s *inFlight) get(key uint64) bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.requests[key] +func (r *requestIdMap) get(key uint64) bool { + r.mu.Lock() + defer r.mu.Unlock() + return r.requests[key] } -func (s *inFlight) delete(key uint64) { - s.mu.Lock() - delete(s.requests, key) - s.mu.Unlock() +func (r *requestIdMap) delete(key uint64) { + r.mu.Lock() + delete(r.requests, key) + r.mu.Unlock() } type SyncClientMetrics interface { @@ -244,14 +244,13 @@ type SyncClient struct { quarantineByNum map[uint64]common.Hash // inFlight requests are not repeated - inFlight *inFlight + inFlight *requestIdMap inFlightChecks chan inFlightCheck - rangeRequests chan rangeRequest - activeRangeRequests map[uint64]bool - activeRangeRequestsMu sync.Mutex - rangeReqId uint64 - peerRequests chan peerRequest + rangeRequests chan rangeRequest + activeRangeRequests *requestIdMap + rangeReqId uint64 + peerRequests chan peerRequest results chan syncResult @@ -285,10 +284,10 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc peers: make(map[peer.ID]context.CancelFunc), quarantineByNum: make(map[uint64]common.Hash), rangeRequests: make(chan rangeRequest), // blocking - activeRangeRequests: make(map[uint64]bool), + activeRangeRequests: newRequestIdMap(), peerRequests: make(chan peerRequest, 128), results: make(chan syncResult, 128), - inFlight: newInFlight(), + inFlight: newRequestIdMap(), inFlightChecks: make(chan inFlightCheck, 128), globalRL: rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst), resCtx: ctx, @@ -360,15 +359,15 @@ func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockR } // Create shared rangeReqId so associated peerRequests can all be cancelled by setting a single flag rangeReqId := atomic.AddUint64(&s.rangeReqId, 1) - s.activeRangeRequestsMu.Lock() - defer s.activeRangeRequestsMu.Unlock() + // need to flag request as active before adding request to s.rangeRequests to avoid race + s.activeRangeRequests.set(rangeReqId, true) // synchronize requests with the main loop for state access select { case s.rangeRequests <- rangeRequest{start: start.Number, end: end, id: rangeReqId}: - s.activeRangeRequests[rangeReqId] = true return rangeReqId, nil case <-ctx.Done(): + s.activeRangeRequests.delete(rangeReqId) return rangeReqId, fmt.Errorf("too busy with P2P results/requests: %w", ctx.Err()) } } @@ -563,10 +562,7 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) { // once the peer is available, wait for a sync request. select { case pr := <-s.peerRequests: - s.activeRangeRequestsMu.Lock() - isActive := s.activeRangeRequests[pr.rangeReqId] - s.activeRangeRequestsMu.Unlock() - if !isActive { + if !s.activeRangeRequests.get(pr.rangeReqId) { log.Debug("dropping cancelled p2p sync request", "num", pr.num) s.inFlight.delete(pr.num) continue @@ -588,9 +584,7 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) { resultCode = re.ResultCode() if resultCode == ResultCodeNotFoundErr { log.Warn("cancelling p2p sync range request", "rangeReqId", pr.rangeReqId) - s.activeRangeRequestsMu.Lock() - delete(s.activeRangeRequests, pr.rangeReqId) - s.activeRangeRequestsMu.Unlock() + s.activeRangeRequests.delete(pr.rangeReqId) sendResponseError = false // don't penalize peer for this error } } diff --git a/op-node/p2p/sync_test.go b/op-node/p2p/sync_test.go index 8030e6202c6f..cd44fb521473 100644 --- a/op-node/p2p/sync_test.go +++ b/op-node/p2p/sync_test.go @@ -274,10 +274,8 @@ func TestMultiPeerSync(t *testing.T) { payloads.deletePayload(25) rangeReqId, err := clB.RequestL2Range(ctx, payloads.getBlockRef(20), payloads.getBlockRef(30)) - clB.activeRangeRequestsMu.Lock() require.NoError(t, err) - require.True(t, clB.activeRangeRequests[rangeReqId], "expecting range request to be active") - clB.activeRangeRequestsMu.Unlock() + require.True(t, clB.activeRangeRequests.get(rangeReqId), "expecting range request to be active") for i := uint64(29); i > 25; i-- { p := <-recvB @@ -320,9 +318,7 @@ func TestMultiPeerSync(t *testing.T) { break } } - clB.activeRangeRequestsMu.Lock() - require.False(t, clB.activeRangeRequests[rangeReqId], "expecting range request to be cancelled") - clB.activeRangeRequestsMu.Unlock() + require.False(t, clB.activeRangeRequests.get(rangeReqId), "expecting range request to be cancelled") // Add back the block payloads.addPayload(bl25)