From 0df37523246324835a7b3df5d1a2cbfac93496ed Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 2 Apr 2024 18:21:22 +1100 Subject: [PATCH] Don't starve unverified bytes limit on unrequestable pieces --- request-strategy-impls.go | 10 +++---- request-strategy/NOTES.md | 27 +++++++++++++++++++ request-strategy/order.go | 22 ++++++++------- request-strategy/piece.go | 6 ++++- requesting.go | 7 ++--- tests/webseed-partial-seed/.gitignore | 4 +++ tests/webseed-partial-seed/README | 3 +++ tests/webseed-partial-seed/herp_test.go | 13 ++++----- tests/webseed-partial-seed/justfile | 10 +++++++ .../webseed-partial-seed}/test.img.torrent | 0 10 files changed, 78 insertions(+), 24 deletions(-) create mode 100644 request-strategy/NOTES.md create mode 100644 tests/webseed-partial-seed/.gitignore create mode 100644 tests/webseed-partial-seed/README create mode 100644 tests/webseed-partial-seed/justfile rename {testdata => tests/webseed-partial-seed}/test.img.torrent (100%) diff --git a/request-strategy-impls.go b/request-strategy-impls.go index 8b1e338291..dfcec28ae4 100644 --- a/request-strategy-impls.go +++ b/request-strategy-impls.go @@ -88,12 +88,12 @@ type requestStrategyPiece struct { p *Piece } -func (r requestStrategyPiece) Request() bool { - return !r.p.ignoreForRequests() && r.p.purePriority() != PiecePriorityNone +func (r requestStrategyPiece) CountUnverified() bool { + return r.p.hashing || r.p.marking || r.p.queuedForHash() } -func (r requestStrategyPiece) NumPendingChunks() int { - return int(r.p.t.pieceNumPendingChunks(r.p.index)) +func (r requestStrategyPiece) Request() bool { + return !r.p.ignoreForRequests() && r.p.purePriority() != PiecePriorityNone } -var _ request_strategy.Piece = (*requestStrategyPiece)(nil) +var _ request_strategy.Piece = requestStrategyPiece{} diff --git a/request-strategy/NOTES.md b/request-strategy/NOTES.md new file mode 100644 index 0000000000..6e061d1bb4 --- /dev/null +++ b/request-strategy/NOTES.md @@ -0,0 +1,27 @@ +Rough notes on how requests are determined. + +piece ordering cache: + +- pieces are grouped by shared storage capacity and ordered by priority, availability, index and then infohash. +- if a torrent does not have a storage cap, pieces are also filtered by whether requests should currently be made for them. this is because for torrents without a storage cap, there's no need to consider pieces that won't be requested. + +building a list of candidate requests for a peer: + +- pieces are scanned in order of the pre-sorted order for the storage group. +- scanning stops when the cumulative piece length so far exceeds the storage capacity. +- pieces are filtered by whether requests should currently be made for them (hashing, marking, already complete, etc.) +- if requests were added to the consideration list, or the piece was in a partial state, the piece length is added to a cumulative total of unverified bytes. +- if the cumulative total of unverified bytes reaches the configured limit (default 64MiB), piece scanning is halted. + +applying request state: + +- send the appropriate interest message if our interest doesn't match what the peer is seeing +- sort all candidate requests by: + - allowed fast if we're being choked, + - piece priority, + - whether the request is already outstanding to the peer, + - whether the request is not pending from any peer + - if the request is outstanding from a peer: + - how many outstanding requests the existing peer has + - most recently requested + - least available piece diff --git a/request-strategy/order.go b/request-strategy/order.go index 6b27261139..9f0295c60f 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -42,7 +42,8 @@ func pieceOrderLess(i, j *pieceRequestOrderItem) multiless.Computation { // Calls f with requestable pieces in order. func GetRequestablePieces( input Input, pro *PieceRequestOrder, - f func(ih metainfo.Hash, pieceIndex int, orderState PieceRequestOrderState), + // Returns true if the piece should be considered against the unverified bytes limit. + requestPiece func(ih metainfo.Hash, pieceIndex int, orderState PieceRequestOrderState) bool, ) { // Storage capacity left for this run, keyed by the storage capacity pointer on the storage // TorrentImpl. A nil value means no capacity limit. @@ -59,23 +60,26 @@ func GetRequestablePieces( var t = input.Torrent(ih) var piece = t.Piece(item.key.Index) pieceLength := t.PieceLength() + // Storage limits will always apply against requestable pieces, since we need to keep the + // highest priority pieces, even if they're complete or in an undesirable state. if storageLeft != nil { if *storageLeft < pieceLength { return false } *storageLeft -= pieceLength } - if !piece.Request() || piece.NumPendingChunks() == 0 { - // TODO: Clarify exactly what is verified. Stuff that's being hashed should be - // considered unverified and hold up further requests. + if piece.Request() { + if !requestPiece(ih, item.key.Index, item.state) { + // No blocks are being considered from this piece, so it won't result in unverified + // bytes. + return true + } + } else if !piece.CountUnverified() { + // The piece is pristine, and we're not considering it for requests. return true } - if maxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+pieceLength > maxUnverifiedBytes { - return false - } allTorrentsUnverifiedBytes += pieceLength - f(ih, item.key.Index, item.state) - return true + return maxUnverifiedBytes == 0 || allTorrentsUnverifiedBytes < maxUnverifiedBytes }) return } diff --git a/request-strategy/piece.go b/request-strategy/piece.go index 8483da45d7..02c9ff6589 100644 --- a/request-strategy/piece.go +++ b/request-strategy/piece.go @@ -1,6 +1,10 @@ package requestStrategy type Piece interface { + // Whether requests should be made for this piece. This would be false for cases like the piece + // is currently being hashed, or already complete. Request() bool - NumPendingChunks() int + // Whether the piece should be counted towards the unverified bytes limit. The intention is to + // prevent pieces being starved from the opportunity to move to the completed state. + CountUnverified() bool } diff --git a/requesting.go b/requesting.go index 5d79238d09..51419a3599 100644 --- a/requesting.go +++ b/requesting.go @@ -191,12 +191,12 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { requestStrategy.GetRequestablePieces( input, t.getPieceRequestOrder(), - func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) { + func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) bool { if ih != *t.canonicalShortInfohash() { - return + return false } if !p.peerHasPiece(pieceIndex) { - return + return false } requestHeap.pieceStates[pieceIndex].Set(pieceExtra) allowedFast := p.peerAllowedFast.Contains(pieceIndex) @@ -222,6 +222,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { } requestHeap.requestIndexes = append(requestHeap.requestIndexes, r) }) + return true }, ) t.assertPendingRequests() diff --git a/tests/webseed-partial-seed/.gitignore b/tests/webseed-partial-seed/.gitignore new file mode 100644 index 0000000000..32470d663a --- /dev/null +++ b/tests/webseed-partial-seed/.gitignore @@ -0,0 +1,4 @@ +/bin/ +/lib/ +/include/ +pyvenv.cfg diff --git a/tests/webseed-partial-seed/README b/tests/webseed-partial-seed/README new file mode 100644 index 0000000000..b1f738e781 --- /dev/null +++ b/tests/webseed-partial-seed/README @@ -0,0 +1,3 @@ +This directory does tests for https://github.com/anacrolix/torrent/discussions/916. See the justfile too. + +You want to ensure that the seeder and leecher progress completed pieces in lock step. The bug was that the leecher would reach the end of its max unverified bytes window before hitting a piece that the seeder had available. diff --git a/tests/webseed-partial-seed/herp_test.go b/tests/webseed-partial-seed/herp_test.go index 3c1b731f2d..8eedff3685 100644 --- a/tests/webseed-partial-seed/herp_test.go +++ b/tests/webseed-partial-seed/herp_test.go @@ -1,17 +1,18 @@ package webseed_partial_seed import ( - "github.com/anacrolix/torrent" - "github.com/anacrolix/torrent/internal/testutil" - qt "github.com/frankban/quicktest" "path/filepath" "runtime" "testing" + + "github.com/anacrolix/torrent" + "github.com/anacrolix/torrent/internal/testutil" + qt "github.com/frankban/quicktest" ) -func testdataDir() string { +func testSrcDir() string { _, b, _, _ := runtime.Caller(0) - return filepath.Join(filepath.Dir(b), "../../testdata") + return filepath.Dir(b) } func makeSeederClient(t *testing.T) *torrent.Client { @@ -54,7 +55,7 @@ func TestWebseedPartialSeed(t *testing.T) { defer seederClient.Close() testutil.ExportStatusWriter(seederClient, "seeder", t) const infoHashHex = "a88fda5954e89178c372716a6a78b8180ed4dad3" - metainfoPath := filepath.Join(testdataDir(), "test.img.torrent") + metainfoPath := filepath.Join(testSrcDir(), "test.img.torrent") seederTorrent, err := seederClient.AddTorrentFromFile(metainfoPath) assertOk(err) leecherClient := makeLeecherClient(t) diff --git a/tests/webseed-partial-seed/justfile b/tests/webseed-partial-seed/justfile new file mode 100644 index 0000000000..0f1391a100 --- /dev/null +++ b/tests/webseed-partial-seed/justfile @@ -0,0 +1,10 @@ +setup: + python3 -m venv . + bin/pip install rangehttpserver + mkfile -n 500m test.img + +run-server: + bin/python -m RangeHTTPServer 3003 + +run-test: + GOPPROF=http go test -race -v . diff --git a/testdata/test.img.torrent b/tests/webseed-partial-seed/test.img.torrent similarity index 100% rename from testdata/test.img.torrent rename to tests/webseed-partial-seed/test.img.torrent