diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index 8aa1e527c0..21f73c7b0a 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -91,8 +91,9 @@ type dialWorker struct { connected bool // dq is used to pace dials to different addresses of the peer dq *dialQueue - // dialsInFlight are the addresses with dials pending completion. - dialsInFlight int + // dialsInFlight are the addresses with dials pending completion. We use this to schedule new dials + // and to cleanup all pending dials when closing the loop + dialsInFlight map[string]bool // totalDials is used to track number of dials made by this worker for metrics totalDials int @@ -112,6 +113,7 @@ func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest, cl Clock) *dia pendingRequests: make(map[*pendRequest]bool), trackedDials: make(map[string]*addrDial), resch: make(chan dialResult), + dialsInFlight: make(map[string]bool), cl: cl, } } @@ -136,7 +138,7 @@ func (w *dialWorker) loop() { } timerRunning = false if w.dq.Len() > 0 { - if w.dialsInFlight == 0 && !w.connected { + if len(w.dialsInFlight) == 0 && !w.connected { // if there are no dials in flight, trigger the next dials immediately dialTimer.Reset(startTime) } else { @@ -172,25 +174,7 @@ loop: continue loop } - addrs, addrErrs, err := w.s.addrsForDial(req.ctx, w.peer) - if err != nil { - req.resch <- dialResponse{err: &DialError{Peer: w.peer, DialErrors: addrErrs, Cause: err}} - continue loop - } - - // TODO(sukunrt): remove this check. This should never happen given the call - // to w.s.bestAcceptableConnToPeer above, but I think this happens for circuit v2 addresses - for _, addr := range addrs { - if ad, ok := w.trackedDials[string(addr.Bytes())]; ok { - if ad.conn != nil { - // dial to this addr was successful, complete the request - req.resch <- dialResponse{conn: ad.conn} - continue loop - } - } - } - - w.addNewRequest(req, addrs, addrErrs) + w.addNewRequest(req) scheduleNextDial() case <-dialTimer.Ch(): @@ -214,7 +198,7 @@ loop: // Errored without attempting a dial. This happens in case of backoff. w.dispatchError(ad, err) } else { - w.dialsInFlight++ + w.dialsInFlight[string(ad.addr.Bytes())] = true w.totalDials++ } } @@ -233,9 +217,7 @@ loop: if res.Conn != nil { res.Conn.Close() } - // It is better to decrement the dials in flight and schedule one extra dial - // than risking not closing the worker loop on cleanup - w.dialsInFlight-- + delete(w.dialsInFlight, string(res.Addr.Bytes())) continue } @@ -245,7 +227,7 @@ loop: continue } - w.dialsInFlight-- + delete(w.dialsInFlight, string(res.Addr.Bytes())) // We're recording any error as a failure here. // Notably, this also applies to cancelations (i.e. if another dial attempt was faster). // This is ok since the black hole detector uses a very low threshold (5%). @@ -261,14 +243,31 @@ loop: } } -// addNewRequest adds a new dial request to the worker loop. If the request has no pending dials, a response -// is sent immediately otherwise it is tracked in pendingRequests -func (w *dialWorker) addNewRequest(req dialRequest, addrs []ma.Multiaddr, addrErrs []TransportError) { +// addNewRequest adds a new dial request to the worker loop. If the request has a valid connection or all relevant +// dials have failed, the request is handled immediately, otherwise it is added to pendingRequests. +func (w *dialWorker) addNewRequest(req dialRequest) { + addrs, addrErrs, err := w.s.addrsForDial(req.ctx, w.peer) + if err != nil { + req.resch <- dialResponse{err: &DialError{Peer: w.peer, DialErrors: addrErrs, Cause: err}} + return + } + + // TODO(sukunrt): remove this check. This should never happen given the call + // to w.s.bestAcceptableConnToPeer above, but I think this happens for circuit v2 addresses + for _, addr := range addrs { + if ad, ok := w.trackedDials[string(addr.Bytes())]; ok { + if ad.conn != nil { + // dial to this addr was successful, complete the request + req.resch <- dialResponse{conn: ad.conn} + return + } + } + } + // get the delays to dial these addrs from the swarms dialRanker simConnect, _, _ := network.GetSimultaneousConnect(req.ctx) addrRanking := w.rankAddrs(addrs, simConnect) - // create the pending request object pr := &pendRequest{ req: req, err: &DialError{Peer: w.peer, DialErrors: addrErrs}, @@ -300,8 +299,9 @@ func (w *dialWorker) addNewRequest(req dialRequest, addrs []ma.Multiaddr, addrEr } if !ad.dialed { - // we haven't dialed this address. update the ad.ctx to have simultaneous connect values - // set correctly + // We are tracking a dial to this address but we haven't dialled it already. + // If the new request is a holepunching request, update the context and the element in the + // dial queue if isSimConnect, isClient, reason := network.GetSimultaneousConnect(req.ctx); isSimConnect { if wasSimConnect, _, _ := network.GetSimultaneousConnect(ad.ctx); !wasSimConnect { ad.ctx = network.WithSimultaneousConnect(ad.ctx, isClient, reason) @@ -433,8 +433,11 @@ func (w *dialWorker) cleanup() { if w.s.metricsTracer != nil { w.s.metricsTracer.DialCompleted(w.connected, w.totalDials) } - for w.dialsInFlight > 0 { + for len(w.dialsInFlight) > 0 { res := <-w.resch + if res.Kind != DialFailed && res.Kind != DialSuccessful { + continue + } // We're recording any error as a failure here. // Notably, this also applies to cancelations (i.e. if another dial attempt was faster). // This is ok since the black hole detector uses a very low threshold (5%). @@ -442,7 +445,7 @@ func (w *dialWorker) cleanup() { if res.Conn != nil { res.Conn.Close() } - w.dialsInFlight-- + delete(w.dialsInFlight, string(res.Addr.Bytes())) } }