From 4682d4a5618cbe85122c3e5238d99eae0039295a Mon Sep 17 00:00:00 2001 From: Jorropo Date: Thu, 6 Apr 2023 17:21:58 +0200 Subject: [PATCH] feat: warn users who are falling behind reprovides Fixes: #9704 Fixes: #9702 Depends on: #273 --- core/commands/stat_provide.go | 17 +-- core/coreapi/coreapi.go | 2 +- core/node/groups.go | 8 +- core/node/provider.go | 203 +++++++++++++------------ docs/examples/kubo-as-a-library/go.mod | 1 - docs/examples/kubo-as-a-library/go.sum | 2 - go.mod | 5 +- go.sum | 10 +- 8 files changed, 120 insertions(+), 128 deletions(-) diff --git a/core/commands/stat_provide.go b/core/commands/stat_provide.go index f93f73cb4b8b..6ee51e516fa0 100644 --- a/core/commands/stat_provide.go +++ b/core/commands/stat_provide.go @@ -7,10 +7,10 @@ import ( "time" humanize "github.com/dustin/go-humanize" + "github.com/ipfs/boxo/provider" cmds "github.com/ipfs/go-ipfs-cmds" "github.com/ipfs/kubo/core/commands/cmdenv" - - "github.com/ipfs/boxo/provider/batched" + "golang.org/x/exp/constraints" ) var statProvideCmd = &cmds.Command{ @@ -34,12 +34,7 @@ This interface is not stable and may change from release to release. return ErrNotOnline } - sys, ok := nd.Provider.(*batched.BatchProvidingSystem) - if !ok { - return fmt.Errorf("can only return stats if Experimental.AcceleratedDHTClient is enabled") - } - - stats, err := sys.Stat(req.Context) + stats, err := nd.Provider.Stat() if err != nil { return err } @@ -51,7 +46,7 @@ This interface is not stable and may change from release to release. return nil }, Encoders: cmds.EncoderMap{ - cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, s *batched.BatchedProviderStats) error { + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, s *provider.ReproviderStats) error { wtr := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0) defer wtr.Flush() @@ -62,14 +57,14 @@ This interface is not stable and may change from release to release. return nil }), }, - Type: batched.BatchedProviderStats{}, + Type: provider.ReproviderStats{}, } func humanDuration(val time.Duration) string { return val.Truncate(time.Microsecond).String() } -func humanNumber(n int) string { +func humanNumber[T constraints.Float | constraints.Integer](n T) string { nf := float64(n) str := humanSI(nf, 0) fullStr := humanFull(nf, 0) diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index 6c97800a3384..5a7e321a9c6e 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -238,7 +238,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e return nil, fmt.Errorf("error constructing namesys: %w", err) } - subAPI.provider = provider.NewOfflineProvider() + subAPI.provider = provider.NewNoopProvider() subAPI.peerstore = nil subAPI.peerHost = nil diff --git a/core/node/groups.go b/core/node/groups.go index 5486788358cc..e6a570ab8d06 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -304,7 +304,6 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part LibP2P(bcfg, cfg, userResourceOverrides), OnlineProviders( cfg.Experimental.StrategicProviding, - cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy.WithDefault(config.DefaultReproviderStrategy), cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval), ), @@ -320,12 +319,7 @@ func Offline(cfg *config.Config) fx.Option { fx.Provide(libp2p.Routing), fx.Provide(libp2p.ContentRouting), fx.Provide(libp2p.OfflineRouting), - OfflineProviders( - cfg.Experimental.StrategicProviding, - cfg.Experimental.AcceleratedDHTClient, - cfg.Reprovider.Strategy.WithDefault(config.DefaultReproviderStrategy), - cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval), - ), + OfflineProviders(), ) } diff --git a/core/node/provider.go b/core/node/provider.go index 58e74bdb3f89..a97a96f82e59 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -5,145 +5,150 @@ import ( "fmt" "time" + "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/fetcher" pin "github.com/ipfs/boxo/pinning/pinner" provider "github.com/ipfs/boxo/provider" - "github.com/ipfs/boxo/provider/batched" - q "github.com/ipfs/boxo/provider/queue" - "github.com/ipfs/boxo/provider/simple" - "go.uber.org/fx" - - "github.com/ipfs/kubo/core/node/helpers" "github.com/ipfs/kubo/repo" irouting "github.com/ipfs/kubo/routing" + "go.uber.org/fx" ) -// SIMPLE - -// ProviderQueue creates new datastore backed provider queue -func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*q.Queue, error) { - return q.NewQueue(helpers.LifecycleCtx(mctx, lc), "provider-v1", repo.Datastore()) -} - -// SimpleProvider creates new record provider -func SimpleProvider(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *q.Queue, rt irouting.ProvideManyRouter) provider.Provider { - return simple.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt) -} - -// SimpleReprovider creates new reprovider -func SimpleReprovider(reproviderInterval time.Duration) interface{} { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt irouting.ProvideManyRouter, keyProvider simple.KeyChanFunc) (provider.Reprovider, error) { - return simple.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil - } -} - -// SimpleProviderSys creates new provider system -func SimpleProviderSys(isOnline bool) interface{} { - return func(lc fx.Lifecycle, p provider.Provider, r provider.Reprovider) provider.System { - sys := provider.NewSystem(p, r) - - if isOnline { - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - sys.Run() - return nil - }, - OnStop: func(ctx context.Context) error { - return sys.Close() - }, - }) - } - - return sys - } -} - -// BatchedProviderSys creates new provider system -func BatchedProviderSys(isOnline bool, reprovideInterval time.Duration) interface{} { - return func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) { - sys, err := batched.New(cr, q, - batched.ReproviderInterval(reprovideInterval), - batched.Datastore(repo.Datastore()), - batched.KeyProvider(keyProvider)) +func ProviderSys(reprovideInterval time.Duration) fx.Option { + const magicThroughputReportCount = 128 + return fx.Provide(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, keyProvider provider.KeyChanFunc, repo repo.Repo, bs blockstore.Blockstore) (provider.System, error) { + sys, err := provider.New(repo.Datastore(), + provider.Online(cr), + provider.ReproviderInterval(reprovideInterval), + provider.KeyProvider(keyProvider), + provider.ThroughputReport(func(reprovide bool, complete bool, keysProvided uint, duration time.Duration) bool { + avgProvideSpeed := duration / time.Duration(keysProvided) + count := uint64(keysProvided) + + if !reprovide || !complete { + // We don't know how many CIDs we have to provide, try to fetch it from the blockstore. + // But don't try for too long as this might be very expensive if you have a huge datastore. + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() + + // FIXME: I want a running counter of blocks so size of blockstore can be an O(1) lookup. + ch, err := bs.AllKeysChan(ctx) + if err != nil { + logger.Errorf("fetching AllKeysChain in provider ThroughputReport: %v", err) + return false + } + count = 0 + countLoop: + for { + select { + case _, ok := <-ch: + if !ok { + break countLoop + } + count++ + case <-ctx.Done(): + // really big blockstore mode + + // how many blocks would be in a 10TiB blockstore with 128KiB blocks. + const probableBigBlockstore = (10 * 1024 * 1024 * 1024 * 1024) / (128 * 1024) + // How long per block that lasts us. + expectedProvideSpeed := reprovideInterval / probableBigBlockstore + if avgProvideSpeed > expectedProvideSpeed { + // FIXME(@Jorropo): add link to the accelerated DHT client docs once this isn't experimental anymore. + logger.Errorf(` +🔔🔔🔔 YOU MAY BE FALLING BEHIND DHT REPROVIDES! 🔔🔔🔔 + +⚠️ Your system might be struggling to keep up with DHT reprovides! +Meaning your content being partially or completely inacessible on the network. +We observed that you recently provided %d keys at an average rate of %v per key. + +🕑 An attempt to estimate your blockstore size timed out after 5 minutes, +implying your blockstore might be exceedingly large. Assuming a considerable +size of 10TiB, it would take %v to provide the complete set. + +⏰ The total provide time needs to stay under 24 hours to prevent falling behind! + +💡 Consider enabling the Accelerated DHT to enhance your system performance.`, + keysProvided, avgProvideSpeed, avgProvideSpeed*probableBigBlockstore) + return false + } + } + } + } + + // How long per block that lasts us. + expectedProvideSpeed := reprovideInterval / time.Duration(count) + if avgProvideSpeed > expectedProvideSpeed { + // FIXME(@Jorropo): add link to the accelerated DHT client docs once this isn't experimental anymore. + logger.Errorf(` +🔔🔔🔔 YOU ARE FALLING BEHIND DHT REPROVIDES! 🔔🔔🔔 + +⚠️ Your system is struggling to keep up with DHT reprovides! +Meaning your content being partially or completely inacessible on the network. +We observed that you recently provided %d keys at an average rate of %v per key. + +💾 Your total CID count is ~%d which would total at %v reprovide process. + +⏰ The total provide time needs to stay under 24 hours to prevent falling behind! + +💡 Consider enabling the Accelerated DHT to enhance your reprovide throughput.`, + keysProvided, avgProvideSpeed, count, avgProvideSpeed*time.Duration(count)) + } + return false + }, magicThroughputReportCount), + ) if err != nil { return nil, err } - if isOnline { - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - sys.Run() - return nil - }, - OnStop: func(ctx context.Context) error { - return sys.Close() - }, - }) - } + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return sys.Close() + }, + }) return sys, nil - } + }) } // ONLINE/OFFLINE // OnlineProviders groups units managing provider routing records online -func OnlineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval time.Duration) fx.Option { - if useStrategicProviding { - return fx.Provide(provider.NewOfflineProvider) - } - - return fx.Options( - SimpleProviders(reprovideStrategy, reprovideInterval), - maybeProvide(SimpleProviderSys(true), !useBatchedProviding), - maybeProvide(BatchedProviderSys(true, reprovideInterval), useBatchedProviding), - ) -} - -// OfflineProviders groups units managing provider routing records offline -func OfflineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval time.Duration) fx.Option { +func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval time.Duration) fx.Option { if useStrategicProviding { - return fx.Provide(provider.NewOfflineProvider) + return OfflineProviders() } - return fx.Options( - SimpleProviders(reprovideStrategy, reprovideInterval), - maybeProvide(SimpleProviderSys(false), true), - //maybeProvide(BatchedProviderSys(false, reprovideInterval), useBatchedProviding), - ) -} - -// SimpleProviders creates the simple provider/reprovider dependencies -func SimpleProviders(reprovideStrategy string, reproviderInterval time.Duration) fx.Option { var keyProvider fx.Option switch reprovideStrategy { - case "all": - fallthrough - case "": - keyProvider = fx.Provide(simple.NewBlockstoreProvider) + case "all", "": + keyProvider = fx.Provide(provider.NewBlockstoreProvider) case "roots": keyProvider = fx.Provide(pinnedProviderStrategy(true)) case "pinned": keyProvider = fx.Provide(pinnedProviderStrategy(false)) default: - return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", reprovideStrategy)) + return fx.Error(fmt.Errorf("unknown reprovider strategy %q", reprovideStrategy)) } return fx.Options( - fx.Provide(ProviderQueue), - fx.Provide(SimpleProvider), keyProvider, - fx.Provide(SimpleReprovider(reproviderInterval)), + ProviderSys(reprovideInterval), ) } +// OfflineProviders groups units managing provider routing records offline +func OfflineProviders() fx.Option { + return fx.Provide(provider.NewNoopProvider) +} + func pinnedProviderStrategy(onlyRoots bool) interface{} { type input struct { fx.In Pinner pin.Pinner IPLDFetcher fetcher.Factory `name:"ipldFetcher"` } - return func(in input) simple.KeyChanFunc { - return simple.NewPinnedProvider(onlyRoots, in.Pinner, in.IPLDFetcher) + return func(in input) provider.KeyChanFunc { + return provider.NewPinnedProvider(onlyRoots, in.Pinner, in.IPLDFetcher) } } diff --git a/docs/examples/kubo-as-a-library/go.mod b/docs/examples/kubo-as-a-library/go.mod index 52e3c064699c..b44244e1c164 100644 --- a/docs/examples/kubo-as-a-library/go.mod +++ b/docs/examples/kubo-as-a-library/go.mod @@ -21,7 +21,6 @@ require ( github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect - github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cenkalti/backoff/v4 v4.2.0 // indirect github.com/ceramicnetwork/go-dag-jose v0.1.0 // indirect github.com/cespare/xxhash v1.1.0 // indirect diff --git a/docs/examples/kubo-as-a-library/go.sum b/docs/examples/kubo-as-a-library/go.sum index 3111d683f532..aa5dde56b659 100644 --- a/docs/examples/kubo-as-a-library/go.sum +++ b/docs/examples/kubo-as-a-library/go.sum @@ -74,8 +74,6 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4= github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/go.mod b/go.mod index a756f0f32068..e4bda7daa4d2 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/google/uuid v1.3.0 github.com/hashicorp/go-multierror v1.1.1 - github.com/ipfs/boxo v0.8.2-0.20230510114019-33e3f0cd052b + github.com/ipfs/boxo v0.8.2-0.20230515125142-da153f4690bd github.com/ipfs/go-block-format v0.1.2 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-cidutil v0.1.0 @@ -81,6 +81,7 @@ require ( go.uber.org/fx v1.19.2 go.uber.org/zap v1.24.0 golang.org/x/crypto v0.9.0 + golang.org/x/exp v0.0.0-20230321023759-10a507213a29 golang.org/x/mod v0.10.0 golang.org/x/sync v0.1.0 golang.org/x/sys v0.8.0 @@ -92,7 +93,6 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect @@ -215,7 +215,6 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect go4.org v0.0.0-20230225012048-214862532bf5 // indirect - golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect golang.org/x/net v0.10.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect golang.org/x/term v0.8.0 // indirect diff --git a/go.sum b/go.sum index af7ec2fc01ae..b6b46630317e 100644 --- a/go.sum +++ b/go.sum @@ -84,8 +84,6 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4= github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -356,8 +354,12 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.8.2-0.20230510114019-33e3f0cd052b h1:6EVpfwbBgwhfZOA19i55jOGokKOy+OaQAm1dg4RbXmc= -github.com/ipfs/boxo v0.8.2-0.20230510114019-33e3f0cd052b/go.mod h1:Ej2r08Z4VIaFKqY08UXMNhwcLf6VekHhK8c+KqA1B9Y= +github.com/ipfs/boxo v0.8.2-0.20230511175036-cedadbf667d7 h1:Nbe1zkYEMv81oRfJJH2kCyXr+ENuR+mbTko7/jvXEs8= +github.com/ipfs/boxo v0.8.2-0.20230511175036-cedadbf667d7/go.mod h1:LbIQUU7IEKw2AMm8sCLgJ/hljd/4yyiMtPzc68nveOc= +github.com/ipfs/boxo v0.8.2-0.20230511183253-30d1ef9a3fa6 h1:qaocp22F74W6YHDEE4qxS5Zn2b6lDN5mZeBbIVAgqFA= +github.com/ipfs/boxo v0.8.2-0.20230511183253-30d1ef9a3fa6/go.mod h1:LbIQUU7IEKw2AMm8sCLgJ/hljd/4yyiMtPzc68nveOc= +github.com/ipfs/boxo v0.8.2-0.20230515125142-da153f4690bd h1:2WrEH+ymhrJK7G0ouf6GhUvFTY7fBr0labD4+DHmgok= +github.com/ipfs/boxo v0.8.2-0.20230515125142-da153f4690bd/go.mod h1:LbIQUU7IEKw2AMm8sCLgJ/hljd/4yyiMtPzc68nveOc= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY=