Skip to content

Commit

Permalink
feat: add experimental optimistic provide (#783)
Browse files Browse the repository at this point in the history
* feat: netsize package

* feat: initialize netsize estimator

* fix: move estimator tracking top get closest peers

* feat: OptimisticProvide method

* feat: improve go-routine handling

* fix: use correct add provider rpc context

* feat: debug logs

* feat: complex context handling

* chore: remove debug directives

* chore: remove obsolete least square fit

* fix: staticcheck error

* fix: don't recalculate return threshold

* feat: limit asynchronicity for optimistic provide

* optprov: make weight function configurable

* remove: case of completed optprov lookup

I observed that the network size estimation is not correct for the
optprov case in comparison to the classic-only measurement. I think the
reason lies in the function that got removed here.

* Revert "optprov: make weight function configurable"

This reverts commit 0e77534.

* refactor: remove unnecessary variable

* test: add rudimentary netsize test

* Revert "remove: case of completed optprov lookup"

This reverts commit f6da739.

* improve: weight calculation for non-full buckets

* Update dht_options.go

Co-authored-by: Jorropo <[email protected]>

* incorporate pr feedback

* fix: tests

* Expose DHT network size estimation

* Fix deprecated API

* Expose network size estimation metric

* add optimistic provide smoke test

* un-export optimistic provide constants

* Mark optimistic provide as experimental

* clarify optimistic provide code comments

---------

Co-authored-by: Jorropo <[email protected]>
  • Loading branch information
dennis-tra and Jorropo authored Apr 5, 2023
1 parent dd27ddd commit 32fbe47
Show file tree
Hide file tree
Showing 13 changed files with 901 additions and 49 deletions.
27 changes: 26 additions & 1 deletion dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
"github.com/libp2p/go-libp2p-kad-dht/internal/net"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
"github.com/libp2p/go-libp2p-kad-dht/netsize"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
Expand Down Expand Up @@ -147,6 +148,13 @@ type IpfsDHT struct {

rtFreezeTimeout time.Duration

// network size estimator
nsEstimator *netsize.Estimator
enableOptProv bool

// a bound channel to limit asynchronicity of in-flight ADD_PROVIDER RPCs
optProvJobsPool chan struct{}

// configuration variables for tests
testAddressUpdateProcessing bool
}
Expand Down Expand Up @@ -298,6 +306,9 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err

addPeerToRTChan: make(chan addPeerRTReq),
refreshFinishedCh: make(chan struct{}),

enableOptProv: cfg.EnableOptimisticProvide,
optProvJobsPool: nil,
}

var maxLastSuccessfulOutboundThreshold time.Duration
Expand All @@ -323,6 +334,13 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
dht.routingTable = rt
dht.bootstrapPeers = cfg.BootstrapPeers

// init network size estimator
dht.nsEstimator = netsize.NewEstimator(h.ID(), rt, cfg.BucketSize)

if dht.enableOptProv {
dht.optProvJobsPool = make(chan struct{}, cfg.OptimisticProvideJobsPoolSize)
}

// rt refresh manager
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
if err != nil {
Expand Down Expand Up @@ -838,13 +856,20 @@ func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
return dht.protoMessenger.Ping(ctx, p)
}

// NetworkSize returns the most recent estimation of the DHT network size.
// EXPERIMENTAL: We do not provide any guarantees that this method will
// continue to exist in the codebase. Use it at your own risk.
func (dht *IpfsDHT) NetworkSize() (int32, error) {
return dht.nsEstimator.NetworkSize()
}

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
extraTags = append(
extraTags,
tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
tag.Upsert(metrics.KeyPeerID, dht.self.String()),
tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
)
ctx, _ = tag.New(
Expand Down
28 changes: 28 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,31 @@ func forceAddressUpdateProcessing(t *testing.T) Option {
return nil
}
}

// EnableOptimisticProvide enables an optimization that skips the last hops of the provide process.
// This works by using the network size estimator (which uses the keyspace density of queries)
// to optimistically send ADD_PROVIDER requests when we most likely have found the last hop.
// It will also run some ADD_PROVIDER requests asynchronously in the background after returning,
// this allows to optimistically return earlier if some threshold number of RPCs have succeeded.
// The number of background/in-flight queries can be configured with the OptimisticProvideJobsPoolSize
// option.
//
// EXPERIMENTAL: This is an experimental option and might be removed in the future. Use at your own risk.
func EnableOptimisticProvide() Option {
return func(c *dhtcfg.Config) error {
c.EnableOptimisticProvide = true
return nil
}
}

// OptimisticProvideJobsPoolSize allows to configure the asynchronicity limit for in-flight ADD_PROVIDER RPCs.
// It makes sense to set it to a multiple of optProvReturnRatio * BucketSize. Check the description of
// EnableOptimisticProvide for more details.
//
// EXPERIMENTAL: This is an experimental option and might be removed in the future. Use at your own risk.
func OptimisticProvideJobsPoolSize(size int) Option {
return func(c *dhtcfg.Config) error {
c.OptimisticProvideJobsPoolSize = size
return nil
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
go.uber.org/zap v1.24.0
gonum.org/v1/gonum v0.11.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y=
Expand Down
6 changes: 6 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type Config struct {
// test specific Config options
DisableFixLowPeers bool
TestAddressUpdateProcessing bool

EnableOptimisticProvide bool
OptimisticProvideJobsPoolSize int
}

func EmptyQueryFilter(_ interface{}, ai peer.AddrInfo) bool { return true }
Expand Down Expand Up @@ -120,6 +123,9 @@ var Defaults = func(o *Config) error {
o.Concurrency = 10
o.Resiliency = 3

// MAGIC: It makes sense to set it to a multiple of OptProvReturnRatio * BucketSize. We chose a multiple of 4.
o.OptimisticProvideJobsPoolSize = 60

return nil
}

Expand Down
91 changes: 56 additions & 35 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/libp2p/go-libp2p-kad-dht/internal"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
Expand All @@ -26,47 +28,66 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID,
if key == "" {
return nil, fmt.Errorf("can't lookup empty key")
}
// TODO: I can break the interface! return []peer.ID
lookupRes, err := dht.runLookupWithFollowup(ctx, key,
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})

mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetClosestPeers", trace.WithAttributes(attribute.Stringer("peer", p)))
peers, err := dht.protoMessenger.GetClosestPeers(mctx, p, peer.ID(key))
if err != nil {
if mspan.IsRecording() {
mspan.SetStatus(codes.Error, err.Error())
}
mspan.End()
logger.Debugf("error getting closer peers: %s", err)
return nil, err
}
mspan.End()

// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: peers,
})

return peers, err
},
func() bool { return false },
)
//TODO: I can break the interface! return []peer.ID
lookupRes, err := dht.runLookupWithFollowup(ctx, key, dht.pmGetClosestPeers(key), func(*qpeerset.QueryPeerset) bool { return false })

if err != nil {
return nil, err
}

if ctx.Err() == nil && lookupRes.completed {
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())
if err := ctx.Err(); err != nil || !lookupRes.completed {
return lookupRes.peers, err
}

// tracking lookup results for network size estimator
if err = dht.nsEstimator.Track(key, lookupRes.closest); err != nil {
logger.Warnf("network size estimator track peers: %s", err)
}

if ns, err := dht.nsEstimator.NetworkSize(); err == nil {
metrics.NetworkSize.M(int64(ns))
}

return lookupRes.peers, ctx.Err()
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())

return lookupRes.peers, nil
}

// pmGetClosestPeers is the protocol messenger version of the GetClosestPeer queryFn.
func (dht *IpfsDHT) pmGetClosestPeers(key string) queryFn {
return func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})

mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetClosestPeers", trace.WithAttributes(attribute.Stringer("peer", p)))
peers, err := dht.protoMessenger.GetClosestPeers(mctx, p, peer.ID(key))
if err != nil {
if mspan.IsRecording() {
mspan.SetStatus(codes.Error, err.Error())
}
mspan.End()
logger.Debugf("error getting closer peers: %s", err)
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.QueryError,
ID: p,
Extra: err.Error(),
})
return nil, err
}
mspan.End()

// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: peers,
})

return peers, err
}
}
Loading

0 comments on commit 32fbe47

Please sign in to comment.