Skip to content

Commit

Permalink
Add low level support for BEP 10 user protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Feb 22, 2024
1 parent 2cf5c7e commit 8605abc
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 31 deletions.
20 changes: 18 additions & 2 deletions callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ import (
type Callbacks struct {
// Called after a peer connection completes the BitTorrent handshake. The Client lock is not
// held.
CompletedHandshake func(*PeerConn, InfoHash)
ReadMessage func(*PeerConn, *pp.Message)
CompletedHandshake func(*PeerConn, InfoHash)
ReadMessage func(*PeerConn, *pp.Message)
// This can be folded into the general case below.
ReadExtendedHandshake func(*PeerConn, *pp.ExtendedHandshakeMessage)
PeerConnClosed func(*PeerConn)
// BEP 10 message. Not sure if I should call this Ltep universally. Each handler here is called
// in order.
PeerConnReadExtensionMessage []func(PeerConnReadExtensionMessageEvent)

// Provides secret keys to be tried against incoming encrypted connections.
ReceiveEncryptedHandshakeSkeys mse.SecretKeyIter
Expand All @@ -25,6 +29,11 @@ type Callbacks struct {
SentRequest []func(PeerRequestEvent)
PeerClosed []func(*Peer)
NewPeer []func(*Peer)
// Called when a PeerConn has been added to a Torrent. It's finished all BitTorrent protocol
// handshakes, and is about to start sending and receiving BitTorrent messages. The extended
// handshake has not yet occurred. This is a good time to alter the supported extension
// protocols.
PeerConnAdded []func(*PeerConn)
}

type ReceivedUsefulDataEvent = PeerMessageEvent
Expand All @@ -38,3 +47,10 @@ type PeerRequestEvent struct {
Peer *Peer
Request
}

type PeerConnReadExtensionMessageEvent struct {
PeerConn *PeerConn
// You can look up what protocol this corresponds to using the PeerConn.LocalLtepProtocolMap.
ExtensionNumber pp.ExtensionNumber
Payload []byte
}
21 changes: 10 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/tracker"
Expand Down Expand Up @@ -90,6 +89,8 @@ type Client struct {
httpClient *http.Client

clientHolepunchAddrSets

defaultLocalLtepProtocolMap LocalLtepProtocolMap
}

type ipStr string
Expand Down Expand Up @@ -214,16 +215,16 @@ func (cl *Client) init(cfg *ClientConfig) {
MaxConnsPerHost: 10,
}
}
cl.defaultLocalLtepProtocolMap = makeBuiltinLtepProtocols(!cfg.DisablePEX)
}

func NewClient(cfg *ClientConfig) (cl *Client, err error) {
if cfg == nil {
cfg = NewDefaultClientConfig()
cfg.ListenPort = 0
}
var client Client
client.init(cfg)
cl = &client
cl = &Client{}
cl.init(cfg)
go cl.acceptLimitClearer()
cl.initLogger()
defer func() {
Expand Down Expand Up @@ -1089,6 +1090,10 @@ func (t *Torrent) runHandshookConn(pc *PeerConn) error {
return fmt.Errorf("adding connection: %w", err)
}
defer t.dropConnection(pc)
pc.addBuiltinLtepProtocols(!cl.config.DisablePEX)
for _, cb := range pc.callbacks.PeerConnAdded {
cb(pc)
}
pc.startMessageWriter()
pc.sendInitialMessages()
pc.initUpdateRequestsTimer()
Expand Down Expand Up @@ -1146,10 +1151,6 @@ func (pc *PeerConn) sendInitialMessages() {
ExtendedID: pp.HandshakeExtendedID,
ExtendedPayload: func() []byte {
msg := pp.ExtendedHandshakeMessage{
M: map[pp.ExtensionName]pp.ExtensionNumber{
pp.ExtensionNameMetadata: metadataExtendedId,
utHolepunch.ExtensionName: utHolepunchExtendedId,
},
V: cl.config.ExtendedHandshakeClientVersion,
Reqq: localClientReqq,
YourIp: pp.CompactIp(pc.remoteIp()),
Expand All @@ -1160,9 +1161,7 @@ func (pc *PeerConn) sendInitialMessages() {
Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
Ipv6: cl.config.PublicIp6.To16(),
}
if !cl.config.DisablePEX {
msg.M[pp.ExtensionNamePex] = pexExtendedId
}
msg.M = pc.LocalLtepProtocolMap.toSupportedExtensionDict()
return bencode.MustMarshal(msg)
}(),
})
Expand Down
8 changes: 0 additions & 8 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ const (
maxMetadataSize uint32 = 16 * 1024 * 1024
)

// These are our extended message IDs. Peers will use these values to
// select which extension a message is intended for.
const (
metadataExtendedId = iota + 1 // 0 is reserved for deleting keys
pexExtendedId
utHolepunchExtendedId
)

func defaultPeerExtensionBytes() PeerExtensionBits {
return pp.NewPeerExtensionBytes(pp.ExtensionBitDht, pp.ExtensionBitLtep, pp.ExtensionBitFast)
}
Expand Down
69 changes: 69 additions & 0 deletions ltep.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package torrent

import (
"fmt"
"slices"

g "github.com/anacrolix/generics"
pp "github.com/anacrolix/torrent/peer_protocol"
)

type LocalLtepProtocolMap struct {
// 1-based mapping from extension number to extension name (subtract one from the extension ID
// to find the corresponding protocol name). The first LocalLtepProtocolBuiltinCount of these
// are use builtin handlers. If you want to handle builtin protocols yourself, you would move
// them above the threshold. You can disable them by removing them entirely, and add your own.
// These changes should be done in the PeerConnAdded callback.
Index []pp.ExtensionName
// How many of the protocols are using the builtin handlers.
NumBuiltin int
}

func (me *LocalLtepProtocolMap) toSupportedExtensionDict() (m map[pp.ExtensionName]pp.ExtensionNumber) {
g.MakeMapWithCap(&m, len(me.Index))
for i, name := range me.Index {
old := g.MapInsert(m, name, pp.ExtensionNumber(i+1))
if old.Ok {
panic(fmt.Sprintf("extension %q already defined with id %v", name, old.Value))
}
}
return
}

// Returns the local extension name for the given ID. If builtin is true, the implementation intends
// to handle it itself. For incoming messages with extension ID 0, the message is a handshake, and
// should be treated specially.
func (me *LocalLtepProtocolMap) LookupId(id pp.ExtensionNumber) (name pp.ExtensionName, builtin bool, err error) {
if id == 0 {
err = fmt.Errorf("extension ID 0 is handshake")
builtin = true
return
}
protocolIndex := int(id - 1)
if protocolIndex >= len(me.Index) {
err = fmt.Errorf("unexpected extended message ID: %v", id)
return
}
builtin = protocolIndex < me.NumBuiltin
name = me.Index[protocolIndex]
return
}

func (me *LocalLtepProtocolMap) builtin() []pp.ExtensionName {
return me.Index[:me.NumBuiltin]
}

func (me *LocalLtepProtocolMap) user() []pp.ExtensionName {
return me.Index[me.NumBuiltin:]
}

func (me *LocalLtepProtocolMap) AddUserProtocol(name pp.ExtensionName) {
builtin := slices.DeleteFunc(me.builtin(), func(delName pp.ExtensionName) bool {
return delName == name
})
user := slices.DeleteFunc(me.user(), func(delName pp.ExtensionName) bool {
return delName == name
})
me.Index = append(append(builtin, user...), name)
me.NumBuiltin = len(builtin)
}
130 changes: 130 additions & 0 deletions ltep_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package torrent_test

import (
"strconv"
"testing"

pp "github.com/anacrolix/torrent/peer_protocol"

qt "github.com/frankban/quicktest"

"github.com/anacrolix/torrent/internal/testutil"

"github.com/anacrolix/sync"

. "github.com/anacrolix/torrent"
)

const (
testRepliesToOddsExtensionName = "pm_me_odds"
testRepliesToEvensExtensionName = "pm_me_evens"
)

func countHandler(
c *qt.C,
wg *sync.WaitGroup,
// Name of the endpoint that this handler is for, for logging.
handlerName string,
// Whether we expect evens or odds
expectedMod2 uint,
// Extension name of messages we expect to handle.
answerToName pp.ExtensionName,
// Extension name of messages we expect to send.
replyToName pp.ExtensionName,
// Signal done when this value is seen.
doneValue uint,
) func(event PeerConnReadExtensionMessageEvent) {
return func(event PeerConnReadExtensionMessageEvent) {
// Read handshake, don't look it up.
if event.ExtensionNumber == 0 {
return
}
name, builtin, err := event.PeerConn.LocalLtepProtocolMap.LookupId(event.ExtensionNumber)
c.Assert(err, qt.IsNil)
// Not a user protocol.
if builtin {
return
}
switch name {
case answerToName:
u64, err := strconv.ParseUint(string(event.Payload), 10, 0)
c.Assert(err, qt.IsNil)
i := uint(u64)
c.Logf("%v got %d", handlerName, i)
if i == doneValue {
wg.Done()
return
}
c.Assert(i%2, qt.Equals, expectedMod2)
go func() {
c.Assert(
event.PeerConn.WriteExtendedMessage(
replyToName,
[]byte(strconv.FormatUint(uint64(i+1), 10))),
qt.IsNil)
}()
default:
c.Fatalf("got unexpected extension name %q", name)
}
}
}

func TestUserLtep(t *testing.T) {
c := qt.New(t)
var wg sync.WaitGroup

makeCfg := func() *ClientConfig {
cfg := TestingConfig(t)
// Only want a single connection to between the clients.
cfg.DisableUTP = true
cfg.DisableIPv6 = true
return cfg
}

evensCfg := makeCfg()
evensCfg.Callbacks.ReadExtendedHandshake = func(pc *PeerConn, msg *pp.ExtendedHandshakeMessage) {
// The client lock is held while handling this event, so we have to do synchronous work in a
// separate goroutine.
go func() {
// Check sending an extended message for a protocol the peer doesn't support is an error.
c.Check(pc.WriteExtendedMessage("pm_me_floats", []byte("3.142")), qt.IsNotNil)
// Kick things off by sending a 1.
c.Check(pc.WriteExtendedMessage(testRepliesToOddsExtensionName, []byte("1")), qt.IsNil)
}()
}
evensCfg.Callbacks.PeerConnReadExtensionMessage = append(
evensCfg.Callbacks.PeerConnReadExtensionMessage,
countHandler(c, &wg, "evens", 0, testRepliesToEvensExtensionName, testRepliesToOddsExtensionName, 100))
evensCfg.Callbacks.PeerConnAdded = append(evensCfg.Callbacks.PeerConnAdded, func(conn *PeerConn) {
conn.LocalLtepProtocolMap.AddUserProtocol(testRepliesToEvensExtensionName)
c.Assert(conn.LocalLtepProtocolMap.Index[conn.LocalLtepProtocolMap.NumBuiltin:], qt.HasLen, 1)
})

oddsCfg := makeCfg()
oddsCfg.Callbacks.PeerConnAdded = append(oddsCfg.Callbacks.PeerConnAdded, func(conn *PeerConn) {
conn.LocalLtepProtocolMap.AddUserProtocol(testRepliesToOddsExtensionName)
c.Assert(conn.LocalLtepProtocolMap.Index[conn.LocalLtepProtocolMap.NumBuiltin:], qt.HasLen, 1)
})
oddsCfg.Callbacks.PeerConnReadExtensionMessage = append(
oddsCfg.Callbacks.PeerConnReadExtensionMessage,
countHandler(c, &wg, "odds", 1, testRepliesToOddsExtensionName, testRepliesToEvensExtensionName, 100))

cl1, err := NewClient(oddsCfg)
c.Assert(err, qt.IsNil)
defer cl1.Close()
cl2, err := NewClient(evensCfg)
c.Assert(err, qt.IsNil)
defer cl2.Close()
addOpts := AddTorrentOpts{}
t1, _ := cl1.AddTorrentOpt(addOpts)
t2, _ := cl2.AddTorrentOpt(addOpts)
defer testutil.ExportStatusWriter(cl1, "cl1", t)()
defer testutil.ExportStatusWriter(cl2, "cl2", t)()
// Expect one PeerConn to see the value.
wg.Add(1)
added := t1.AddClientPeer(cl2)
// Ensure some addresses for the other client were added.
c.Assert(added, qt.Not(qt.Equals), 0)
wg.Wait()
_ = t2
}
7 changes: 7 additions & 0 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,13 @@ func (p *Peer) close() {
}
}

func (p *Peer) Close() error {
p.locker().Lock()
defer p.locker().Unlock()
p.close()
return nil
}

// Peer definitely has a piece, for purposes of requesting. So it's not sufficient that we think
// they do (known=true).
func (cn *Peer) peerHasPiece(piece pieceIndex) bool {
Expand Down
2 changes: 1 addition & 1 deletion peer_protocol/extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type (
}

ExtensionName string
ExtensionNumber int
ExtensionNumber uint8
)

const (
Expand Down
Loading

0 comments on commit 8605abc

Please sign in to comment.