Skip to content

Commit

Permalink
feat: limit asynchronicity for optimistic provide
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Oct 30, 2022
1 parent b0288f4 commit fb6fa01
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 45 deletions.
12 changes: 9 additions & 3 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ type IpfsDHT struct {
rtFreezeTimeout time.Duration

// network size estimator
nsEstimator *netsize.Estimator
enableOptimisticProvide bool
nsEstimator *netsize.Estimator
enableOptProv bool
optProvJobsPool chan struct{}

// configuration variables for tests
testAddressUpdateProcessing bool
Expand Down Expand Up @@ -304,7 +305,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
addPeerToRTChan: make(chan addPeerRTReq),
refreshFinishedCh: make(chan struct{}),

enableOptimisticProvide: cfg.EnableOptimisticProvide,
enableOptProv: cfg.EnableOptimisticProvide,
optProvJobsPool: nil,
}

var maxLastSuccessfulOutboundThreshold time.Duration
Expand Down Expand Up @@ -333,6 +335,10 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
// init network size estimator
dht.nsEstimator = netsize.NewEstimator(h.ID(), rt, cfg.BucketSize)

if dht.enableOptProv {
dht.optProvJobsPool = make(chan struct{}, cfg.OptimisticProvideJobsPoolSize)
}

// rt refresh manager
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,12 @@ func EnableOptimisticProvide() Option {
return nil
}
}

// OptimisticProvideJobsPoolSize allows to configure the asynchronicity limit for in-flight ADD_PROVIDER RPCs.
// It makes sense to set it to a multiple of OptProvReturnRatio * BucketSize
func OptimisticProvideJobsPoolSize(size int) Option {
return func(c *dhtcfg.Config) error {
c.OptimisticProvideJobsPoolSize = size
return nil
}
}
5 changes: 4 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ type Config struct {
DisableFixLowPeers bool
TestAddressUpdateProcessing bool

EnableOptimisticProvide bool
EnableOptimisticProvide bool
OptimisticProvideJobsPoolSize int
}

func EmptyQueryFilter(_ interface{}, ai peer.AddrInfo) bool { return true }
Expand Down Expand Up @@ -122,6 +123,8 @@ var Defaults = func(o *Config) error {
o.Concurrency = 10
o.Resiliency = 3

o.OptimisticProvideJobsPoolSize = 60

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"fmt"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"

kb "github.com/libp2p/go-libp2p-kbucket"
)
Expand Down
81 changes: 55 additions & 26 deletions lookup_estim.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-kad-dht/netsize"
"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p/core/peer"
ks "github.com/whyrusleeping/go-keyspace"
"gonum.org/v1/gonum/mathext"
)
Expand Down Expand Up @@ -73,6 +74,9 @@ type estimatorState struct {

// number of completed (regardless of success) ADD_PROVIDER RPCs before we return control back to the user.
returnThreshold int

// putProvDone counts the ADD_PROVIDER RPCs that have completed (successful and unsuccessful)
putProvDone atomic.Int32
}

func (dht *IpfsDHT) newEstimatorState(ctx context.Context, key string) (*estimatorState, error) {
Expand All @@ -97,6 +101,7 @@ func (dht *IpfsDHT) newEstimatorState(ctx context.Context, key string) (*estimat
individualThreshold: individualThreshold,
setThreshold: setThreshold,
returnThreshold: returnThreshold,
putProvDone: atomic.Int32{},
}, nil
}

Expand Down Expand Up @@ -153,7 +158,7 @@ func (dht *IpfsDHT) GetAndProvideToClosestPeers(outerCtx context.Context, key st
es.peerStatesLk.Unlock()

// wait until a threshold number of RPCs have completed
es.waitForRPCs(es.returnThreshold)
es.waitForRPCs()

if outerCtx.Err() == nil && lookupRes.completed { // likely the "completed" field is false but that's not a given

Expand Down Expand Up @@ -236,38 +241,62 @@ func (es *estimatorState) putProviderRecord(pid peer.ID) {
es.doneChan <- struct{}{}
}

func (es *estimatorState) waitForRPCs(returnThreshold int) {
// waitForRPCs waits for a subset of ADD_PROVIDER RPCs to complete and then acquire a lease on
// a bound channel to return early back to the user and prevent unbound asynchronicity. If
// there are already too many requests in-flight we are just waiting for our current set to
// finish.
func (es *estimatorState) waitForRPCs() {
es.peerStatesLk.RLock()
rpcCount := len(es.peerStates)
es.peerStatesLk.RUnlock()

// returnThreshold can't be larger than the total number issued RPCs
if returnThreshold > rpcCount {
returnThreshold = rpcCount
if es.returnThreshold > rpcCount {
es.returnThreshold = rpcCount
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
dones := 0
for range es.doneChan {
dones += 1
// Indicate to the wait group that returnThreshold RPCs have finished but
// don't break here. Keep go-routine around so that the putProviderRecord
// go-routines can write to the done channel and everything gets cleaned
// up properly.
if dones == returnThreshold {
wg.Done()
}

// If the total number RPCs was reached break for loop and close the done channel.
if dones == rpcCount {
break
// Wait until returnThreshold ADD_PROVIDER RPCs have returned
for range es.doneChan {
if int(es.putProvDone.Add(1)) == es.returnThreshold {
break
}
}
// At this point only a subset of all ADD_PROVIDER RPCs have completed.
// We want to give control back to the user as soon as possible because
// it is highly likely that at least one of the remaining RPCs will time
// out and thus slow down the whole processes. The provider records will
// already be available with less than the total number of RPCs having
// finished. This has been investigated here:
// https://github.com/protocol/network-measurements/blob/master/results/rfm17-provider-record-liveness.md

// For the remaining ADD_PROVIDER RPCs try to acquire a lease on the optProvJobsPool channel.
// If that worked we need to consume the doneChan and release the acquired lease on the
// optProvJobsPool channel.
remaining := rpcCount - int(es.putProvDone.Load())
for i := 0; i < remaining; i++ {
select {
case es.dht.optProvJobsPool <- struct{}{}:
// We were able to acquire a lease on the optProvJobsPool channel.
// Consume doneChan to release the acquired lease again.
go es.consumeDoneChan(rpcCount)
case <-es.doneChan:
// We were not able to acquire a lease but an ADD_PROVIDER RPC resolved.
if int(es.putProvDone.Add(1)) == rpcCount {
close(es.doneChan)
}
}
close(es.doneChan)
}()
}
}

func (es *estimatorState) consumeDoneChan(until int) {
// Wait for an RPC to finish
<-es.doneChan

// wait until returnThreshold ADD_PROVIDER RPCs have finished
wg.Wait()
// Release acquired lease for other's to get a spot
<-es.dht.optProvJobsPool

// If all RPCs have finished, close the channel.
if int(es.putProvDone.Add(1)) == until {
close(es.doneChan)
}
}
7 changes: 3 additions & 4 deletions netsize/netsize.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ import (
"time"

logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"
kbucket "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p/core/peer"
ks "github.com/whyrusleeping/go-keyspace"
)

var (
ErrNotEnoughData = fmt.Errorf("not enough data")
ErrWrongNumOfPeers = fmt.Errorf("expected bucket size number of peers")
ErrUncertaintyTooHigh = fmt.Errorf("estimate uncertainty too high") // TODO: unused
ErrNotEnoughData = fmt.Errorf("not enough data")
ErrWrongNumOfPeers = fmt.Errorf("expected bucket size number of peers")
)

var (
Expand Down
9 changes: 0 additions & 9 deletions netsize/netsize_test.go

This file was deleted.

0 comments on commit fb6fa01

Please sign in to comment.