Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimistic Provide #783

Merged
merged 30 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
301c42d
feat: netsize package
dennis-tra Jul 22, 2022
365fe9f
feat: initialize netsize estimator
dennis-tra Jul 22, 2022
b4c72d3
fix: move estimator tracking top get closest peers
dennis-tra Jul 22, 2022
6f663ba
feat: OptimisticProvide method
dennis-tra Jul 23, 2022
5f175f0
feat: improve go-routine handling
dennis-tra Jul 23, 2022
7e40a3d
fix: use correct add provider rpc context
dennis-tra Jul 24, 2022
911f338
feat: debug logs
dennis-tra Jul 27, 2022
2906a50
feat: complex context handling
dennis-tra Jul 27, 2022
6484e3c
chore: remove debug directives
dennis-tra Aug 3, 2022
6132110
chore: remove obsolete least square fit
dennis-tra Aug 4, 2022
126edfb
fix: staticcheck error
dennis-tra Aug 12, 2022
8fef3f7
fix: don't recalculate return threshold
dennis-tra Aug 12, 2022
7755e33
feat: limit asynchronicity for optimistic provide
dennis-tra Oct 30, 2022
4f6ad22
optprov: make weight function configurable
dennis-tra Nov 17, 2022
cd60302
remove: case of completed optprov lookup
dennis-tra Dec 7, 2022
2c5ff04
Revert "optprov: make weight function configurable"
dennis-tra Dec 8, 2022
a7e97af
refactor: remove unnecessary variable
dennis-tra Dec 8, 2022
c6c2ad7
test: add rudimentary netsize test
dennis-tra Dec 8, 2022
439702a
Revert "remove: case of completed optprov lookup"
dennis-tra Dec 9, 2022
de3d6b8
improve: weight calculation for non-full buckets
dennis-tra Dec 9, 2022
e5a275e
Update dht_options.go
dennis-tra Feb 7, 2023
709f968
incorporate pr feedback
dennis-tra Feb 21, 2023
76a7142
fix: tests
dennis-tra Feb 21, 2023
e2e76c7
Expose DHT network size estimation
dennis-tra Mar 30, 2023
92af8dd
Fix deprecated API
dennis-tra Mar 30, 2023
9cb79d1
Expose network size estimation metric
dennis-tra Mar 30, 2023
c12208c
add optimistic provide smoke test
dennis-tra Apr 3, 2023
895c382
un-export optimistic provide constants
dennis-tra Apr 5, 2023
0c0ad38
Mark optimistic provide as experimental
dennis-tra Apr 5, 2023
ce82af6
clarify optimistic provide code comments
dennis-tra Apr 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
"github.com/libp2p/go-libp2p-kad-dht/internal/net"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
"github.com/libp2p/go-libp2p-kad-dht/netsize"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
Expand Down Expand Up @@ -147,6 +148,13 @@ type IpfsDHT struct {

rtFreezeTimeout time.Duration

// network size estimator
nsEstimator *netsize.Estimator
enableOptProv bool

// a bound channel to limit asynchronicity of in-flight ADD_PROVIDER RPCs
optProvJobsPool chan struct{}

// configuration variables for tests
testAddressUpdateProcessing bool
}
Expand Down Expand Up @@ -298,6 +306,9 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err

addPeerToRTChan: make(chan addPeerRTReq),
refreshFinishedCh: make(chan struct{}),

enableOptProv: cfg.EnableOptimisticProvide,
optProvJobsPool: nil,
}

var maxLastSuccessfulOutboundThreshold time.Duration
Expand All @@ -323,6 +334,13 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
dht.routingTable = rt
dht.bootstrapPeers = cfg.BootstrapPeers

// init network size estimator
dht.nsEstimator = netsize.NewEstimator(h.ID(), rt, cfg.BucketSize)

if dht.enableOptProv {
dht.optProvJobsPool = make(chan struct{}, cfg.OptimisticProvideJobsPoolSize)
}

// rt refresh manager
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
if err != nil {
Expand Down Expand Up @@ -838,13 +856,20 @@ func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
return dht.protoMessenger.Ping(ctx, p)
}

// NetworkSize returns the most recent estimation of the DHT network size.
// EXPERIMENTAL: We do not provide any guarantees that this method will
// continue to exist in the codebase. Use it at your own risk.
func (dht *IpfsDHT) NetworkSize() (int32, error) {
return dht.nsEstimator.NetworkSize()
}

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
extraTags = append(
extraTags,
tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
tag.Upsert(metrics.KeyPeerID, dht.self.String()),
tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
)
ctx, _ = tag.New(
Expand Down
28 changes: 28 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,31 @@ func forceAddressUpdateProcessing(t *testing.T) Option {
return nil
}
}

// EnableOptimisticProvide enables an optimization that skips the last hops of the provide process.
// This works by using the network size estimator (which uses the keyspace density of queries)
// to optimistically send ADD_PROVIDER requests when we most likely have found the last hop.
// It will also run some ADD_PROVIDER requests asynchronously in the background after returning,
// this allows to optimistically return earlier if some threshold number of RPCs have succeeded.
// The number of background/in-flight queries can be configured with the OptimisticProvideJobsPoolSize
// option.
//
// EXPERIMENTAL: This is an experimental option and might be removed in the future. Use at your own risk.
func EnableOptimisticProvide() Option {
return func(c *dhtcfg.Config) error {
c.EnableOptimisticProvide = true
return nil
}
}

// OptimisticProvideJobsPoolSize allows to configure the asynchronicity limit for in-flight ADD_PROVIDER RPCs.
// It makes sense to set it to a multiple of optProvReturnRatio * BucketSize. Check the description of
// EnableOptimisticProvide for more details.
//
// EXPERIMENTAL: This is an experimental option and might be removed in the future. Use at your own risk.
func OptimisticProvideJobsPoolSize(size int) Option {
return func(c *dhtcfg.Config) error {
c.OptimisticProvideJobsPoolSize = size
return nil
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
go.uber.org/zap v1.24.0
gonum.org/v1/gonum v0.11.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y=
Expand Down
6 changes: 6 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type Config struct {
// test specific Config options
DisableFixLowPeers bool
TestAddressUpdateProcessing bool

EnableOptimisticProvide bool
OptimisticProvideJobsPoolSize int
}

func EmptyQueryFilter(_ interface{}, ai peer.AddrInfo) bool { return true }
Expand Down Expand Up @@ -120,6 +123,9 @@ var Defaults = func(o *Config) error {
o.Concurrency = 10
o.Resiliency = 3

// MAGIC: It makes sense to set it to a multiple of OptProvReturnRatio * BucketSize. We chose a multiple of 4.
o.OptimisticProvideJobsPoolSize = 60

return nil
}

Expand Down
91 changes: 56 additions & 35 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/libp2p/go-libp2p-kad-dht/internal"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
Expand All @@ -26,47 +28,66 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID,
if key == "" {
return nil, fmt.Errorf("can't lookup empty key")
}
// TODO: I can break the interface! return []peer.ID
lookupRes, err := dht.runLookupWithFollowup(ctx, key,
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})

mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetClosestPeers", trace.WithAttributes(attribute.Stringer("peer", p)))
peers, err := dht.protoMessenger.GetClosestPeers(mctx, p, peer.ID(key))
if err != nil {
if mspan.IsRecording() {
mspan.SetStatus(codes.Error, err.Error())
}
mspan.End()
logger.Debugf("error getting closer peers: %s", err)
return nil, err
}
mspan.End()

// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: peers,
})

return peers, err
},
func() bool { return false },
)
//TODO: I can break the interface! return []peer.ID
lookupRes, err := dht.runLookupWithFollowup(ctx, key, dht.pmGetClosestPeers(key), func(*qpeerset.QueryPeerset) bool { return false })

if err != nil {
return nil, err
}

if ctx.Err() == nil && lookupRes.completed {
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())
if err := ctx.Err(); err != nil || !lookupRes.completed {
return lookupRes.peers, err
}

// tracking lookup results for network size estimator
if err = dht.nsEstimator.Track(key, lookupRes.closest); err != nil {
logger.Warnf("network size estimator track peers: %s", err)
}

if ns, err := dht.nsEstimator.NetworkSize(); err == nil {
metrics.NetworkSize.M(int64(ns))
}

return lookupRes.peers, ctx.Err()
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())

return lookupRes.peers, nil
}

// pmGetClosestPeers is the protocol messenger version of the GetClosestPeer queryFn.
func (dht *IpfsDHT) pmGetClosestPeers(key string) queryFn {
return func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})

mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetClosestPeers", trace.WithAttributes(attribute.Stringer("peer", p)))
peers, err := dht.protoMessenger.GetClosestPeers(mctx, p, peer.ID(key))
if err != nil {
if mspan.IsRecording() {
mspan.SetStatus(codes.Error, err.Error())
}
mspan.End()
logger.Debugf("error getting closer peers: %s", err)
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.QueryError,
ID: p,
Extra: err.Error(),
})
return nil, err
}
mspan.End()

// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: peers,
})

return peers, err
}
}
Loading