Skip to content

Commit

Permalink
Misc debug status, pex conn tracking improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed May 1, 2023
1 parent 7e65e55 commit 1e13625
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 31 deletions.
28 changes: 17 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
}
}

// Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
// Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection
// for valid reasons.
func (cl *Client) initiateProtocolHandshakes(
ctx context.Context,
Expand Down Expand Up @@ -730,14 +730,16 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfus
return nil, errors.New("dial failed")
}
addrIpPort, _ := tryIpPortFromNetAddr(addr)
c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{
outgoing: true,
remoteAddr: addr,
// It would be possible to retrieve a public IP from the dialer used here?
localPublicAddr: cl.publicAddr(addrIpPort.IP),
network: dr.Dialer.DialerNetwork(),
connString: regularNetConnPeerConnConnString(nc),
})
c, err := cl.initiateProtocolHandshakes(
context.Background(), nc, t, obfuscatedHeader,
newConnectionOpts{
outgoing: true,
remoteAddr: addr,
// It would be possible to retrieve a public IP from the dialer used here?
localPublicAddr: cl.publicAddr(addrIpPort.IP),
network: dr.Dialer.DialerNetwork(),
connString: regularNetConnPeerConnConnString(nc),
})
if err != nil {
nc.Close()
}
Expand Down Expand Up @@ -1510,13 +1512,17 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon
}
}
c.peerImpl = c
c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
c.logger = cl.logger.WithDefaultLevel(log.Warning)
c.setRW(connStatsReadWriter{nc, c})
c.r = &rateLimitedReader{
l: cl.config.DownloadRateLimiter,
r: c.r,
}
c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", opts.remoteAddr, opts.network, opts.outgoing)
c.logger.Levelf(
log.Debug,
"new PeerConn %p [Client %p remoteAddr %v network %v outgoing %t]",
c, cl, opts.remoteAddr, opts.network, opts.outgoing,
)
for _, f := range cl.config.Callbacks.NewPeer {
f(&c.Peer)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444
github.com/anacrolix/envpprof v1.2.1
github.com/anacrolix/fuse v0.2.0
github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60
github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68
github.com/anacrolix/go-libutp v1.2.0
github.com/anacrolix/log v0.13.2-0.20221123232138-02e2764801c3
github.com/anacrolix/missinggo v1.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ github.com/anacrolix/fuse v0.2.0 h1:pc+To78kI2d/WUjIyrsdqeJQAesuwpGxlI3h1nAv3Do=
github.com/anacrolix/fuse v0.2.0/go.mod h1:Kfu02xBwnySDpH3N23BmrP3MDfwAQGRLUCj6XyeOvBQ=
github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 h1:k4/h2B1gGF+PJGyGHxs8nmHHt1pzWXZWBj6jn4OBlRc=
github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68 h1:fyXlBfnlFzZSFckJ8QLb2lfmWfY++4RiUnae7ZMuv0A=
github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8=
github.com/anacrolix/go-libutp v1.2.0 h1:sjxoB+/ARiKUR7IK/6wLWyADIBqGmu1fm0xo+8Yy7u0=
github.com/anacrolix/go-libutp v1.2.0/go.mod h1:RrJ3KcaDcf9Jqp33YL5V/5CBEc6xMc7aJL8wXfuWL50=
github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU=
Expand Down
44 changes: 44 additions & 0 deletions netip-addrport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package torrent

import (
"fmt"
"net"
"net/netip"

"github.com/anacrolix/dht/v2/krpc"
)

func ipv4AddrPortFromKrpcNodeAddr(na krpc.NodeAddr) (_ netip.AddrPort, err error) {
ip4 := na.IP.To4()
if ip4 == nil {
err = fmt.Errorf("not an ipv4 address: %v", na.IP)
return
}
addr := netip.AddrFrom4([4]byte(ip4))
addrPort := netip.AddrPortFrom(addr, uint16(na.Port))
return addrPort, nil
}

func ipv6AddrPortFromKrpcNodeAddr(na krpc.NodeAddr) (_ netip.AddrPort, err error) {
ip6 := na.IP.To16()
if ip6 == nil {
err = fmt.Errorf("not an ipv4 address: %v", na.IP)
return
}
addr := netip.AddrFrom16([16]byte(ip6))
addrPort := netip.AddrPortFrom(addr, uint16(na.Port))
return addrPort, nil
}

func addrPortFromPeerRemoteAddr(pra PeerRemoteAddr) (netip.AddrPort, error) {
switch v := pra.(type) {
case *net.TCPAddr:
return v.AddrPort(), nil
case *net.UDPAddr:
return v.AddrPort(), nil
case netip.AddrPort:
return v, nil
default:
return netip.ParseAddrPort(pra.String())
}
}
2 changes: 1 addition & 1 deletion peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int))
next(None[pieceIndex]())
}

func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
func (cn *Peer) writeStatus(w io.Writer) {
// \t isn't preserved in <pre> blocks?
if cn.closed.IsSet() {
fmt.Fprint(w, "CLOSED: ")
Expand Down
28 changes: 27 additions & 1 deletion peer_protocol/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"errors"
"fmt"
"io"
"math/bits"
"strconv"
"strings"
"unsafe"

"github.com/anacrolix/torrent/metainfo"
)
Expand Down Expand Up @@ -33,8 +36,31 @@ type (
PeerExtensionBits [8]byte
)

var bitTags = []struct {
bit ExtensionBit
tag string
}{
// Ordered by their base protocol type values (PORT, fast.., EXTENDED)
{ExtensionBitDHT, "dht"},
{ExtensionBitFast, "fast"},
{ExtensionBitExtended, "ext"},
}

func (pex PeerExtensionBits) String() string {
return hex.EncodeToString(pex[:])
pexHex := hex.EncodeToString(pex[:])
tags := make([]string, 0, len(bitTags)+1)
for _, bitTag := range bitTags {
if pex.GetBit(bitTag.bit) {
tags = append(tags, bitTag.tag)
pex.SetBit(bitTag.bit, false)
}
}
unknownCount := bits.OnesCount64(*(*uint64)((unsafe.Pointer(unsafe.SliceData(pex[:])))))
if unknownCount != 0 {
tags = append(tags, fmt.Sprintf("%v unknown", unknownCount))
}
return fmt.Sprintf("%v (%s)", pexHex, strings.Join(tags, ", "))

}

func NewPeerExtensionBytes(bits ...ExtensionBit) (ret PeerExtensionBits) {
Expand Down
1 change: 1 addition & 0 deletions peer_protocol/pex.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (m *PexMsg) Message(pexExtendedId ExtensionNumber) Message {
}
}

// Unmarshals and returns a PEX message.
func LoadPexMsg(b []byte) (ret PexMsg, err error) {
err = bencode.Unmarshal(b, &ret)
return
Expand Down
15 changes: 14 additions & 1 deletion peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,19 @@ type PeerConn struct {
}

func (cn *PeerConn) peerImplStatusLines() []string {
return []string{fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)}
lines := make([]string, 0, 2)
lines = append(
lines,
fmt.Sprintf("%+-55q %v %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString))
if cn.supportsExtension(pp.ExtensionNamePex) {
lines = append(
lines,
fmt.Sprintf(
"pex: %v conns, %v unsent events",
cn.pex.remoteLiveConns,
cn.pex.numPending()))
}
return lines
}

// Returns true if the connection is over IPv6.
Expand Down Expand Up @@ -848,6 +860,7 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
c.requestPendingMetadata()
if !t.cl.config.DisablePEX {
t.pex.Add(c) // we learnt enough now
// This checks the extension is supported internally.
c.pex.Init(c)
}
return nil
Expand Down
21 changes: 17 additions & 4 deletions pex.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func (me *pexMsgFactory) append(event pexEvent) {
}
}

func (me *pexMsgFactory) PexMsg() pp.PexMsg {
return me.msg
func (me *pexMsgFactory) PexMsg() *pp.PexMsg {
return &me.msg
}

// Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr
Expand Down Expand Up @@ -225,7 +225,7 @@ func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
s.RLock()
defer s.RUnlock()
if start == nil {
return s.msg0.PexMsg(), s.tail
return *s.msg0.PexMsg(), s.tail
}
var msg pexMsgFactory
last := start
Expand All @@ -236,5 +236,18 @@ func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) {
msg.append(*e)
last = e
}
return msg.PexMsg(), last
return *msg.PexMsg(), last
}

// The same as Genmsg but just counts up the distinct events that haven't been sent.
func (s *pexState) numPending(start *pexEvent) (num int) {
s.RLock()
defer s.RUnlock()
if start == nil {
return s.msg0.PexMsg().Len()
}
for e := start.next; e != nil; e = e.next {
num++
}
return
}
54 changes: 46 additions & 8 deletions pexconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package torrent

import (
"fmt"
"net/netip"
"time"

g "github.com/anacrolix/generics"

"github.com/anacrolix/log"

pp "github.com/anacrolix/torrent/peer_protocol"
Expand All @@ -26,6 +29,8 @@ type pexConnState struct {
Listed bool
info log.Logger
dbg log.Logger
// Running record of live connections the remote end of the connection purports to have.
remoteLiveConns map[netip.AddrPort]g.Option[pp.PexPeerFlags]
}

func (s *pexConnState) IsEnabled() bool {
Expand Down Expand Up @@ -67,6 +72,13 @@ func (s *pexConnState) genmsg() *pp.PexMsg {
return &tx
}

func (s *pexConnState) numPending() int {
if s.torrent == nil {
return 0
}
return s.torrent.pex.numPending(s.last)
}

// Share is called from the writer goroutine if when it is woken up with the write buffers empty
// Returns whether there's more room on the send buffer to write to.
func (s *pexConnState) Share(postfn messageWriter) bool {
Expand All @@ -86,25 +98,51 @@ func (s *pexConnState) Share(postfn messageWriter) bool {
return true
}

func (s *pexConnState) updateRemoteLiveConns(rx pp.PexMsg) (errs []error) {
for _, dropped := range rx.Dropped {
addrPort, _ := ipv4AddrPortFromKrpcNodeAddr(dropped)
delete(s.remoteLiveConns, addrPort)
}
for _, dropped := range rx.Dropped6 {
addrPort, _ := ipv6AddrPortFromKrpcNodeAddr(dropped)
delete(s.remoteLiveConns, addrPort)
}
for i, added := range rx.Added {
addr := netip.AddrFrom4([4]byte(added.IP.To4()))
addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
flags := g.SliceGet(rx.AddedFlags, i)
g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
}
for i, added := range rx.Added6 {
addr := netip.AddrFrom16([16]byte(added.IP.To16()))
addrPort := netip.AddrPortFrom(addr, uint16(added.Port))
flags := g.SliceGet(rx.Added6Flags, i)
g.MakeMapIfNilAndSet(&s.remoteLiveConns, addrPort, flags)
}
return
}

// Recv is called from the reader goroutine
func (s *pexConnState) Recv(payload []byte) error {
rx, err := pp.LoadPexMsg(payload)
if err != nil {
return fmt.Errorf("unmarshalling pex message: %w", err)
}
s.dbg.Printf("received pex message: %v", rx)
torrent.Add("pex added peers received", int64(len(rx.Added)))
torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
s.updateRemoteLiveConns(rx)

if !s.torrent.wantPeers() {
s.dbg.Printf("peer reserve ok, incoming PEX discarded")
return nil
}
// TODO: This should be per conn, not for the whole Torrent.
if time.Now().Before(s.torrent.pex.rest) {

This comment has been minimized.

Copy link
@yarikk

yarikk May 19, 2023

Collaborator

this "if" is useless after this change, as the throttled code is now executed before the gating check, @anacrolix

This comment has been minimized.

Copy link
@anacrolix

anacrolix May 19, 2023

Author Owner

I'm gating the addition of peers. I think we want to stop a peer flooding us with bogus peers. That's what the interval is really about (at least that's my reading of the BEP). I think the way it was before, only a single peer's PEX message was used per interval, per torrent, and the rest would be ignored. Did I get that wrong?

This comment has been minimized.

Copy link
@yarikk

yarikk May 19, 2023

Collaborator

Yes, not ideal, but without this PEX swarms were getting full-meshed very quickly, clogging every node's connection limits. I attached an example chart of number of peers to my original PR #446.

A better way to solve this would be to accept all PEX messages but put the info into an immediate table instead of calling addPeers immediately, then feed off the table in a paced manner, imitating talking to a tracker. That's, however, would be a much bigger and more invasive change I never felt adventurous enough to tackle.

This comment has been minimized.

Copy link
@anacrolix

anacrolix May 22, 2023

Author Owner

I need to follow up on this comment and double check it. I will get back to you, thanks @yarikk .

This comment has been minimized.

Copy link
@anacrolix

anacrolix May 23, 2023

Author Owner
s.dbg.Printf("in cooldown period, incoming PEX discarded")
return nil
}

rx, err := pp.LoadPexMsg(payload)
if err != nil {
return fmt.Errorf("error unmarshalling PEX message: %s", err)
}
s.dbg.Print("incoming PEX message: ", rx)
torrent.Add("pex added peers received", int64(len(rx.Added)))
torrent.Add("pex added6 peers received", int64(len(rx.Added6)))

var peers peerInfos
peers.AppendFromPex(rx.Added6, rx.Added6Flags)
peers.AppendFromPex(rx.Added, rx.AddedFlags)
Expand Down
4 changes: 2 additions & 2 deletions test/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/internal/testutil"
"github.com/anacrolix/torrent/storage"
"github.com/frankban/quicktest"
qt "github.com/frankban/quicktest"
"golang.org/x/time/rate"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestSeedAfterDownloading(t *testing.T) {
defer wg.Done()
r := llg.NewReader()
defer r.Close()
quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil)
}()
done := make(chan struct{})
defer close(done)
Expand Down
5 changes: 3 additions & 2 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type Torrent struct {
// them. That encourages us to reconnect to peers that are well known in
// the swarm.
peers prioritizedPeers
// Whether we want to know to know more peers.
// Whether we want to know more peers.
wantPeersEvent missinggo.Event
// An announcer for each tracker URL.
trackerAnnouncers map[string]torrentTrackerAnnouncer
Expand Down Expand Up @@ -774,7 +774,7 @@ func (t *Torrent) writeStatus(w io.Writer) {
for i, c := range peers {
fmt.Fprintf(w, "%2d. ", i+1)
buf.Reset()
c.writeStatus(&buf, t)
c.writeStatus(&buf)
w.Write(bytes.TrimRight(
bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")),
" "))
Expand Down Expand Up @@ -1983,6 +1983,7 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
panic(len(t.conns))
}
t.conns[c] = struct{}{}
t.cl.event.Broadcast()
if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
t.pex.Add(c) // as no further extended handshake expected
}
Expand Down

0 comments on commit 1e13625

Please sign in to comment.