Skip to content

Commit

Permalink
feat: add experimental optimistic provide
Browse files Browse the repository at this point in the history
This adds the ability to enable "optimistic provide" to the default
DHT client, which enables faster provides and reprovides.

For more information about optimistic provide, see:

https://protocollabs.notion.site/Optimistic-Provide-2c79745820fa45649d48de038516b814

Note that this feature only works when using non-custom router
types. This does not include the ability to enable optimistic provide
on custom routers for now, to minimize the footprint of this
experimental feature. We intend on continuing to test this and improve
the UX, which may or may not involve adding configuration for it to
custom routers. We also plan on refactoring/redesigning custom routers
more broadly so I don't want this to add more effort for maintainers
and confusion for users.
  • Loading branch information
guseggert committed Apr 6, 2023
1 parent c6a59c9 commit 3fc5e08
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 92 deletions.
18 changes: 10 additions & 8 deletions config/experiments.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package config

type Experiments struct {
FilestoreEnabled bool
UrlstoreEnabled bool
ShardingEnabled bool `json:",omitempty"` // deprecated by autosharding: https://github.com/ipfs/kubo/pull/8527
GraphsyncEnabled bool
Libp2pStreamMounting bool
P2pHttpProxy bool //nolint
StrategicProviding bool
AcceleratedDHTClient bool
FilestoreEnabled bool
UrlstoreEnabled bool
ShardingEnabled bool `json:",omitempty"` // deprecated by autosharding: https://github.com/ipfs/kubo/pull/8527
GraphsyncEnabled bool
Libp2pStreamMounting bool
P2pHttpProxy bool //nolint
StrategicProviding bool
AcceleratedDHTClient bool
OptimisticProvide bool
OptimisticProvideJobsPoolSize int
}
21 changes: 14 additions & 7 deletions core/node/libp2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,18 @@ func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHo
return out, err
}

routingOptArgs := RoutingOptionArgs{
Ctx: ctx,
Datastore: params.Repo.Datastore(),
Validator: params.Validator,
BootstrapPeers: bootstrappers,
OptimisticProvide: cfg.Experimental.OptimisticProvide,
OptimisticProvideJobsPoolSize: cfg.Experimental.OptimisticProvideJobsPoolSize,
}
opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
r, err := params.RoutingOption(
ctx, h,
params.Repo.Datastore(),
params.Validator,
bootstrappers...,
)
args := routingOptArgs
args.Host = h
r, err := params.RoutingOption(args)
out.Routing = r
return r, err
}))
Expand All @@ -69,10 +74,12 @@ func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHo
return P2PHostOut{}, err
}

routingOptArgs.Host = out.Host

// this code is necessary just for tests: mock network constructions
// ignore the libp2p constructor options that actually construct the routing!
if out.Routing == nil {
r, err := params.RoutingOption(ctx, out.Host, params.Repo.Datastore(), params.Validator, bootstrappers...)
r, err := params.RoutingOption(routingOptArgs)
if err != nil {
return P2PHostOut{}, err
}
Expand Down
108 changes: 39 additions & 69 deletions core/node/libp2p/routingopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ import (
routing "github.com/libp2p/go-libp2p/core/routing"
)

type RoutingOption func(
context.Context,
host.Host,
datastore.Batching,
record.Validator,
...peer.AddrInfo,
) (routing.Routing, error)
type RoutingOptionArgs struct {
Ctx context.Context
Host host.Host
Datastore datastore.Batching
Validator record.Validator
BootstrapPeers []peer.AddrInfo
OptimisticProvide bool
OptimisticProvideJobsPoolSize int
}

type RoutingOption func(args RoutingOptionArgs) (routing.Routing, error)

// Default HTTP routers used in parallel to DHT when Routing.Type = "auto"
var defaultHTTPRouters = []string{
Expand All @@ -40,25 +44,13 @@ func init() {
}

// ConstructDefaultRouting returns routers used when Routing.Type is unset or set to "auto"
func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
return func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
// Defined routers will be queried in parallel (optimizing for response speed)
// Different trade-offs can be made by setting Routing.Type = "custom" with own Routing.Routers
var routers []*routinghelpers.ParallelRouter

dhtRouting, err := routingOpt(ctx, host, dstore, validator, bootstrapPeers...)
dhtRouting, err := routingOpt(args)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -97,54 +89,38 @@ func ConstructDefaultRouting(peerID string, addrs []string, privKey string, rout
}

// constructDHTRouting is used when Routing.Type = "dht"
func constructDHTRouting(mode dht.ModeOpt) func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
return func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func constructDHTRouting(mode dht.ModeOpt) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
dhtOpts := []dht.Option{
dht.Concurrency(10),
dht.Mode(mode),
dht.Datastore(args.Datastore),
dht.Validator(args.Validator),
}
if args.OptimisticProvide {
dhtOpts = append(dhtOpts, dht.EnableOptimisticProvide())
}
if args.OptimisticProvideJobsPoolSize != 0 {
dhtOpts = append(dhtOpts, dht.OptimisticProvideJobsPoolSize(args.OptimisticProvideJobsPoolSize))
}
return dual.New(
ctx, host,
dual.DHTOption(
dht.Concurrency(10),
dht.Mode(mode),
dht.Datastore(dstore),
dht.Validator(validator)),
dual.WanDHTOption(dht.BootstrapPeers(bootstrapPeers...)),
args.Ctx, args.Host,
dual.DHTOption(dhtOpts...),
dual.WanDHTOption(dht.BootstrapPeers(args.BootstrapPeers...)),
)
}
}

// ConstructDelegatedRouting is used when Routing.Type = "custom"
func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, peerID string, addrs []string, privKey string) func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
return func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, peerID string, addrs []string, privKey string) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
return irouting.Parse(routers, methods,
&irouting.ExtraDHTParams{
BootstrapPeers: bootstrapPeers,
Host: host,
Validator: validator,
Datastore: dstore,
Context: ctx,
BootstrapPeers: args.BootstrapPeers,
Host: args.Host,
Validator: args.Validator,
Datastore: args.Datastore,
Context: args.Ctx,
},
&irouting.ExtraHTTPParams{
PeerID: peerID,
Expand All @@ -154,13 +130,7 @@ func ConstructDelegatedRouting(routers config.Routers, methods config.Methods, p
}
}

func constructNilRouting(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
validator record.Validator,
bootstrapPeers ...peer.AddrInfo,
) (routing.Routing, error) {
func constructNilRouting(_ RoutingOptionArgs) (routing.Routing, error) {
return routinghelpers.Null{}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion docs/examples/kubo-as-a-library/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ require (
github.com/libp2p/go-doh-resolver v0.4.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.22.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.23.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.9.3 // indirect
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect
Expand Down Expand Up @@ -188,6 +188,7 @@ require (
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions docs/examples/kubo-as-a-library/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLE
github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w=
github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g=
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-kad-dht v0.22.0 h1:cW2nGgG0hztDM42tOPyC5cVflD7EzLaHM0/Kjol6Wio=
github.com/libp2p/go-libp2p-kad-dht v0.22.0/go.mod h1:hareSo3Z/GJ7nUWPMj7XhD/56a7+rRltYCWwCuy3FQk=
github.com/libp2p/go-libp2p-kad-dht v0.23.0 h1:sxE6LxLopp79eLeV695n7+c77V/Vn4AMF28AdM/XFqM=
github.com/libp2p/go-libp2p-kad-dht v0.23.0/go.mod h1:oO5N308VT2msnQI6qi5M61wzPmJYg7Tr9e16m5n7uDU=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9bno/4/U1oA=
github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U=
Expand Down Expand Up @@ -1191,6 +1191,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y=
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/libp2p/go-doh-resolver v0.4.0
github.com/libp2p/go-libp2p v0.26.4
github.com/libp2p/go-libp2p-http v0.5.0
github.com/libp2p/go-libp2p-kad-dht v0.22.0
github.com/libp2p/go-libp2p-kad-dht v0.23.0
github.com/libp2p/go-libp2p-kbucket v0.5.0
github.com/libp2p/go-libp2p-pubsub v0.9.3
github.com/libp2p/go-libp2p-pubsub-router v0.6.0
Expand Down Expand Up @@ -222,6 +222,7 @@ require (
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.53.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,8 @@ github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qk
github.com/libp2p/go-libp2p-gostream v0.6.0/go.mod h1:Nywu0gYZwfj7Jc91PQvbGU8dIpqbQQkjWgDuOrFaRdA=
github.com/libp2p/go-libp2p-http v0.5.0 h1:+x0AbLaUuLBArHubbbNRTsgWz0RjNTy6DJLOxQ3/QBc=
github.com/libp2p/go-libp2p-http v0.5.0/go.mod h1:glh87nZ35XCQyFsdzZps6+F4HYI6DctVFY5u1fehwSg=
github.com/libp2p/go-libp2p-kad-dht v0.22.0 h1:cW2nGgG0hztDM42tOPyC5cVflD7EzLaHM0/Kjol6Wio=
github.com/libp2p/go-libp2p-kad-dht v0.22.0/go.mod h1:hareSo3Z/GJ7nUWPMj7XhD/56a7+rRltYCWwCuy3FQk=
github.com/libp2p/go-libp2p-kad-dht v0.23.0 h1:sxE6LxLopp79eLeV695n7+c77V/Vn4AMF28AdM/XFqM=
github.com/libp2p/go-libp2p-kad-dht v0.23.0/go.mod h1:oO5N308VT2msnQI6qi5M61wzPmJYg7Tr9e16m5n7uDU=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9bno/4/U1oA=
github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U=
Expand Down Expand Up @@ -1282,6 +1282,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y=
Expand Down
30 changes: 30 additions & 0 deletions test/cli/dht_opt_prov_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cli

import (
"testing"

"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/ipfs/kubo/test/cli/testutils"
"github.com/stretchr/testify/assert"
)

func TestDHTOptimisticProvide(t *testing.T) {
t.Parallel()

t.Run("optimistic provide smoke test", func(t *testing.T) {
nodes := harness.NewT(t).NewNodes(2).Init()

nodes[0].UpdateConfig(func(cfg *config.Config) {
cfg.Experimental.OptimisticProvide = true
})

nodes.StartDaemons().Connect()

hash := nodes[0].IPFSAddStr(testutils.RandomStr(100))
nodes[0].IPFS("dht", "provide", hash)

res := nodes[1].IPFS("routing", "findprovs", "--num-providers=1", hash)
assert.Equal(t, nodes[0].PeerID().String(), res.Stdout.Trimmed())
})
}
4 changes: 2 additions & 2 deletions test/cli/harness/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func (n *Node) Init(ipfsArgs ...string) *Node {
//
// node.StartDaemonWithReq(harness.RunRequest{
// CmdOpts: []harness.CmdOpt{
// harness.RunWithStderr(os.Stdout),
// harness.RunWithStdout(os.Stdout),
// harness.RunWithStderr(os.Stdout),
// harness.RunWithStdout(os.Stdout),
// },
// })
func (n *Node) StartDaemonWithReq(req RunRequest) *Node {
Expand Down
4 changes: 4 additions & 0 deletions test/cli/testutils/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ func RandomBytes(n int) []byte {
}
return bytes
}

func RandomStr(n int) string {
return string(RandomBytes(n))
}

0 comments on commit 3fc5e08

Please sign in to comment.