diff --git a/client.go b/client.go index b6fed302f3..9306624b96 100644 --- a/client.go +++ b/client.go @@ -686,7 +686,7 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) { } } -// Performs initiator handshakes and returns a connection. Returns nil *connection if no connection +// Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection // for valid reasons. func (cl *Client) initiateProtocolHandshakes( ctx context.Context, @@ -730,14 +730,16 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfus return nil, errors.New("dial failed") } addrIpPort, _ := tryIpPortFromNetAddr(addr) - c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{ - outgoing: true, - remoteAddr: addr, - // It would be possible to retrieve a public IP from the dialer used here? - localPublicAddr: cl.publicAddr(addrIpPort.IP), - network: dr.Dialer.DialerNetwork(), - connString: regularNetConnPeerConnConnString(nc), - }) + c, err := cl.initiateProtocolHandshakes( + context.Background(), nc, t, obfuscatedHeader, + newConnectionOpts{ + outgoing: true, + remoteAddr: addr, + // It would be possible to retrieve a public IP from the dialer used here? + localPublicAddr: cl.publicAddr(addrIpPort.IP), + network: dr.Dialer.DialerNetwork(), + connString: regularNetConnPeerConnConnString(nc), + }) if err != nil { nc.Close() } @@ -1510,13 +1512,17 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon } } c.peerImpl = c - c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c) + c.logger = cl.logger.WithDefaultLevel(log.Warning) c.setRW(connStatsReadWriter{nc, c}) c.r = &rateLimitedReader{ l: cl.config.DownloadRateLimiter, r: c.r, } - c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", opts.remoteAddr, opts.network, opts.outgoing) + c.logger.Levelf( + log.Debug, + "new PeerConn %p [Client %p remoteAddr %v network %v outgoing %t]", + c, cl, opts.remoteAddr, opts.network, opts.outgoing, + ) for _, f := range cl.config.Callbacks.NewPeer { f(&c.Peer) } diff --git a/go.mod b/go.mod index 64e1d63a4d..cf5700a18b 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444 github.com/anacrolix/envpprof v1.2.1 github.com/anacrolix/fuse v0.2.0 - github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 + github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68 github.com/anacrolix/go-libutp v1.2.0 github.com/anacrolix/log v0.13.2-0.20221123232138-02e2764801c3 github.com/anacrolix/missinggo v1.3.0 diff --git a/go.sum b/go.sum index aef4f0eec8..4e9c7ec466 100644 --- a/go.sum +++ b/go.sum @@ -78,6 +78,8 @@ github.com/anacrolix/fuse v0.2.0 h1:pc+To78kI2d/WUjIyrsdqeJQAesuwpGxlI3h1nAv3Do= github.com/anacrolix/fuse v0.2.0/go.mod h1:Kfu02xBwnySDpH3N23BmrP3MDfwAQGRLUCj6XyeOvBQ= github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 h1:k4/h2B1gGF+PJGyGHxs8nmHHt1pzWXZWBj6jn4OBlRc= github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8= +github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68 h1:fyXlBfnlFzZSFckJ8QLb2lfmWfY++4RiUnae7ZMuv0A= +github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8= github.com/anacrolix/go-libutp v1.2.0 h1:sjxoB+/ARiKUR7IK/6wLWyADIBqGmu1fm0xo+8Yy7u0= github.com/anacrolix/go-libutp v1.2.0/go.mod h1:RrJ3KcaDcf9Jqp33YL5V/5CBEc6xMc7aJL8wXfuWL50= github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU= diff --git a/netip-addrport.go b/netip-addrport.go new file mode 100644 index 0000000000..cf9edfd5ed --- /dev/null +++ b/netip-addrport.go @@ -0,0 +1,44 @@ +package torrent + +import ( + "fmt" + "net" + "net/netip" + + "github.com/anacrolix/dht/v2/krpc" +) + +func ipv4AddrPortFromKrpcNodeAddr(na krpc.NodeAddr) (_ netip.AddrPort, err error) { + ip4 := na.IP.To4() + if ip4 == nil { + err = fmt.Errorf("not an ipv4 address: %v", na.IP) + return + } + addr := netip.AddrFrom4([4]byte(ip4)) + addrPort := netip.AddrPortFrom(addr, uint16(na.Port)) + return addrPort, nil +} + +func ipv6AddrPortFromKrpcNodeAddr(na krpc.NodeAddr) (_ netip.AddrPort, err error) { + ip6 := na.IP.To16() + if ip6 == nil { + err = fmt.Errorf("not an ipv4 address: %v", na.IP) + return + } + addr := netip.AddrFrom16([16]byte(ip6)) + addrPort := netip.AddrPortFrom(addr, uint16(na.Port)) + return addrPort, nil +} + +func addrPortFromPeerRemoteAddr(pra PeerRemoteAddr) (netip.AddrPort, error) { + switch v := pra.(type) { + case *net.TCPAddr: + return v.AddrPort(), nil + case *net.UDPAddr: + return v.AddrPort(), nil + case netip.AddrPort: + return v, nil + default: + return netip.ParseAddrPort(pra.String()) + } +} diff --git a/peer.go b/peer.go index d5ed19e53a..e88485b296 100644 --- a/peer.go +++ b/peer.go @@ -275,7 +275,7 @@ func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) next(None[pieceIndex]()) } -func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { +func (cn *Peer) writeStatus(w io.Writer) { // \t isn't preserved in
blocks? if cn.closed.IsSet() { fmt.Fprint(w, "CLOSED: ") diff --git a/peer_protocol/handshake.go b/peer_protocol/handshake.go index acdc3da58f..76dc2b05d2 100644 --- a/peer_protocol/handshake.go +++ b/peer_protocol/handshake.go @@ -5,7 +5,10 @@ import ( "errors" "fmt" "io" + "math/bits" "strconv" + "strings" + "unsafe" "github.com/anacrolix/torrent/metainfo" ) @@ -33,8 +36,31 @@ type ( PeerExtensionBits [8]byte ) +var bitTags = []struct { + bit ExtensionBit + tag string +}{ + // Ordered by their base protocol type values (PORT, fast.., EXTENDED) + {ExtensionBitDHT, "dht"}, + {ExtensionBitFast, "fast"}, + {ExtensionBitExtended, "ext"}, +} + func (pex PeerExtensionBits) String() string { - return hex.EncodeToString(pex[:]) + pexHex := hex.EncodeToString(pex[:]) + tags := make([]string, 0, len(bitTags)+1) + for _, bitTag := range bitTags { + if pex.GetBit(bitTag.bit) { + tags = append(tags, bitTag.tag) + pex.SetBit(bitTag.bit, false) + } + } + unknownCount := bits.OnesCount64(*(*uint64)((unsafe.Pointer(unsafe.SliceData(pex[:]))))) + if unknownCount != 0 { + tags = append(tags, fmt.Sprintf("%v unknown", unknownCount)) + } + return fmt.Sprintf("%v (%s)", pexHex, strings.Join(tags, ", ")) + } func NewPeerExtensionBytes(bits ...ExtensionBit) (ret PeerExtensionBits) { diff --git a/peer_protocol/pex.go b/peer_protocol/pex.go index 784aaa5925..466548a30a 100644 --- a/peer_protocol/pex.go +++ b/peer_protocol/pex.go @@ -28,6 +28,7 @@ func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message { } } +// Unmarshals and returns a PEX message. func LoadPexMsg(b []byte) (ret PexMsg, err error) { err = bencode.Unmarshal(b, &ret) return diff --git a/peerconn.go b/peerconn.go index 0318cb6e33..1c55db6258 100644 --- a/peerconn.go +++ b/peerconn.go @@ -62,7 +62,19 @@ type PeerConn struct { } func (cn *PeerConn) peerImplStatusLines() []string { - return []string{fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)} + lines := make([]string, 0, 2) + lines = append( + lines, + fmt.Sprintf("%+-55q %v %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)) + if cn.supportsExtension(pp.ExtensionNamePex) { + lines = append( + lines, + fmt.Sprintf( + "pex: %v conns, %v unsent events", + cn.pex.remoteLiveConns, + cn.pex.numPending())) + } + return lines } // Returns true if the connection is over IPv6. @@ -848,6 +860,7 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err c.requestPendingMetadata() if !t.cl.config.DisablePEX { t.pex.Add(c) // we learnt enough now + // This checks the extension is supported internally. c.pex.Init(c) } return nil diff --git a/pex.go b/pex.go index 7fa0be887d..4561c5e4d2 100644 --- a/pex.go +++ b/pex.go @@ -145,8 +145,8 @@ func (me *pexMsgFactory) append(event pexEvent) { } } -func (me *pexMsgFactory) PexMsg() pp.PexMsg { - return me.msg +func (me *pexMsgFactory) PexMsg() *pp.PexMsg { + return &me.msg } // Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr @@ -225,7 +225,7 @@ func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) { s.RLock() defer s.RUnlock() if start == nil { - return s.msg0.PexMsg(), s.tail + return *s.msg0.PexMsg(), s.tail } var msg pexMsgFactory last := start @@ -236,5 +236,18 @@ func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) { msg.append(*e) last = e } - return msg.PexMsg(), last + return *msg.PexMsg(), last +} + +// The same as Genmsg but just counts up the distinct events that haven't been sent. +func (s *pexState) numPending(start *pexEvent) (num int) { + s.RLock() + defer s.RUnlock() + if start == nil { + return s.msg0.PexMsg().Len() + } + for e := start.next; e != nil; e = e.next { + num++ + } + return } diff --git a/pexconn.go b/pexconn.go index d0308f756d..5ccc02200d 100644 --- a/pexconn.go +++ b/pexconn.go @@ -2,8 +2,11 @@ package torrent import ( "fmt" + "net/netip" "time" + g "github.com/anacrolix/generics" + "github.com/anacrolix/log" pp "github.com/anacrolix/torrent/peer_protocol" @@ -26,6 +29,8 @@ type pexConnState struct { Listed bool info log.Logger dbg log.Logger + // Running record of live connections the remote end of the connection purports to have. + remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags] } func (s *pexConnState) IsEnabled() bool { @@ -67,6 +72,13 @@ func (s *pexConnState) genmsg() *pp.PexMsg { return &tx } +func (s *pexConnState) numPending() int { + if s.torrent == nil { + return 0 + } + return s.torrent.pex.numPending(s.last) +} + // Share is called from the writer goroutine if when it is woken up with the write buffers empty // Returns whether there's more room on the send buffer to write to. func (s *pexConnState) Share(postfn messageWriter) bool { @@ -86,25 +98,51 @@ func (s *pexConnState) Share(postfn messageWriter) bool { return true } +func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) { + for _, dropped := range rx.Dropped { + addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped) + delete(s.remoteLiveConns, addrPort) + } + for _, dropped := range rx.Dropped6 { + addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped) + delete(s.remoteLiveConns, addrPort) + } + for i, added := range rx.Added { + addr := netip.AddrFrom4([4]byte(added.IP.To4())) + addrPort := netip.AddrPortFrom(addr, uint16(added.Port)) + flags := g.SliceGet(rx.AddedFlags, i) + g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags) + } + for i, added := range rx.Added6 { + addr := netip.AddrFrom16([16]byte(added.IP.To16())) + addrPort := netip.AddrPortFrom(addr, uint16(added.Port)) + flags := g.SliceGet(rx.Added6Flags, i) + g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags) + } + return +} + // Recv is called from the reader goroutine func (s *pexConnState) Recv(payload []byte) error { + rx, err := pp.LoadPexMsg(payload) + if err != nil { + return fmt.Errorf("unmarshalling pex message: %w", err) + } + s.dbg.Printf("received pex message: %v", rx) + torrent.Add("pex added peers received", int64(len(rx.Added))) + torrent.Add("pex added6 peers received", int64(len(rx.Added6))) + s.updateRemoteLiveConns(rx) + if !s.torrent.wantPeers() { s.dbg.Printf("peer reserve ok, incoming PEX discarded") return nil } + // TODO: This should be per conn, not for the whole Torrent. if time.Now().Before(s.torrent.pex.rest) { s.dbg.Printf("in cooldown period, incoming PEX discarded") return nil } - rx, err := pp.LoadPexMsg(payload) - if err != nil { - return fmt.Errorf("error unmarshalling PEX message: %s", err) - } - s.dbg.Print("incoming PEX message: ", rx) - torrent.Add("pex added peers received", int64(len(rx.Added))) - torrent.Add("pex added6 peers received", int64(len(rx.Added6))) - var peers peerInfos peers.AppendFromPex(rx.Added6, rx.Added6Flags) peers.AppendFromPex(rx.Added, rx.AddedFlags) diff --git a/test/transfer_test.go b/test/transfer_test.go index b96c94d329..fa159326a9 100644 --- a/test/transfer_test.go +++ b/test/transfer_test.go @@ -13,7 +13,7 @@ import ( "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/internal/testutil" "github.com/anacrolix/torrent/storage" - "github.com/frankban/quicktest" + qt "github.com/frankban/quicktest" "golang.org/x/time/rate" "github.com/stretchr/testify/assert" @@ -188,7 +188,7 @@ func TestSeedAfterDownloading(t *testing.T) { defer wg.Done() r := llg.NewReader() defer r.Close() - quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil) + qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil) }() done := make(chan struct{}) defer close(done) diff --git a/torrent.go b/torrent.go index f8d31b647d..2f6832e970 100644 --- a/torrent.go +++ b/torrent.go @@ -108,7 +108,7 @@ type Torrent struct { // them. That encourages us to reconnect to peers that are well known in // the swarm. peers prioritizedPeers - // Whether we want to know to know more peers. + // Whether we want to know more peers. wantPeersEvent missinggo.Event // An announcer for each tracker URL. trackerAnnouncers map[string]torrentTrackerAnnouncer @@ -774,7 +774,7 @@ func (t *Torrent) writeStatus(w io.Writer) { for i, c := range peers { fmt.Fprintf(w, "%2d. ", i+1) buf.Reset() - c.writeStatus(&buf, t) + c.writeStatus(&buf) w.Write(bytes.TrimRight( bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")), " ")) @@ -1983,6 +1983,7 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) { panic(len(t.conns)) } t.conns[c] = struct{}{} + t.cl.event.Broadcast() if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() { t.pex.Add(c) // as no further extended handshake expected }