Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reprovider: Use goprocess #6248

Merged
merged 2 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/ipfs/go-ipfs/core/bootstrap"
"github.com/ipfs/go-ipfs/core/node"
"github.com/ipfs/go-ipfs/core/node/libp2p"
rp "github.com/ipfs/go-ipfs/exchange/reprovide"
"github.com/ipfs/go-ipfs/filestore"
"github.com/ipfs/go-ipfs/fuse/mount"
"github.com/ipfs/go-ipfs/namesys"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/provider"
"github.com/ipfs/go-ipfs/repo"
rp "github.com/ipfs/go-ipfs/reprovide"

bserv "github.com/ipfs/go-blockservice"
bstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -69,9 +69,9 @@ type IpfsNode struct {
Repo repo.Repo

// Local node
Pinning pin.Pinner // the pinning manager
Mounts Mounts `optional:"true"` // current mount state, if any.
PrivateKey ic.PrivKey // the local node's private Key
Pinning pin.Pinner // the pinning manager
Mounts Mounts `optional:"true"` // current mount state, if any.
PrivateKey ic.PrivKey // the local node's private Key
PNetFingerprint libp2p.PNetFingerprint `optional:"true"` // fingerprint of private network

// Services
Expand Down
1 change: 0 additions & 1 deletion core/node/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,3 @@ func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.

return root, err
}

28 changes: 14 additions & 14 deletions core/node/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"go.uber.org/fx"

"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/exchange/reprovide"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/provider"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/reprovide"
)

const kReprovideFrequency = time.Hour * 12
Expand Down Expand Up @@ -45,6 +45,16 @@ func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queu
func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) {
var keyProvider reprovide.KeyChanFunc

reproviderInterval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return nil, err
}

reproviderInterval = dur
}

switch cfg.Reprovider.Strategy {
case "all":
fallthrough
Expand All @@ -57,21 +67,11 @@ func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config
default:
return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
}
return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), rt, keyProvider), nil
return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil
}

// Reprovider runs the reprovider service
func Reprovider(cfg *config.Config, reprovider *reprovide.Reprovider) error {
reproviderInterval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return err
}

reproviderInterval = dur
}

go reprovider.Run(reproviderInterval) // TODO: refactor reprovider to have Start/Stop, use lifecycle
func Reprovider(lp lcProcess, reprovider *reprovide.Reprovider) error {
lp.Append(reprovider.Run)
return nil
}
File renamed without changes.
46 changes: 29 additions & 17 deletions exchange/reprovide/reprovide.go → reprovide/reprovide.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,77 @@ package reprovide

import (
"context"
"errors"
"fmt"
"time"

backoff "github.com/cenkalti/backoff"
cid "github.com/ipfs/go-cid"
"github.com/cenkalti/backoff"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-verifcid"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
routing "github.com/libp2p/go-libp2p-routing"
)

var log = logging.Logger("reprovider")

//KeyChanFunc is function streaming CIDs to pass to content routing
// KeyChanFunc is function streaming CIDs to pass to content routing
type KeyChanFunc func(context.Context) (<-chan cid.Cid, error)
type doneFunc func(error)

type Reprovider struct {
ctx context.Context
trigger chan doneFunc
closing chan struct{}

// The routing system to provide values through
rsys routing.ContentRouting

keyProvider KeyChanFunc
tick time.Duration
}

// NewReprovider creates new Reprovider instance.
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
func NewReprovider(ctx context.Context, tick time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
return &Reprovider{
ctx: ctx,
trigger: make(chan doneFunc),
closing: make(chan struct{}),

rsys: rsys,
keyProvider: keyProvider,
tick: tick,
}
}

// Run re-provides keys with 'tick' interval or when triggered
func (rp *Reprovider) Run(tick time.Duration) {
func (rp *Reprovider) Run(proc goprocess.Process) {
ctx := goprocessctx.WithProcessClosing(rp.ctx, proc)
defer close(rp.closing)

// dont reprovide immediately.
// may have just started the daemon and shutting it down immediately.
// probability( up another minute | uptime ) increases with uptime.
after := time.After(time.Minute)
var done doneFunc
for {
if tick == 0 {
if rp.tick == 0 {
after = make(chan time.Time)
}

select {
case <-rp.ctx.Done():
case <-ctx.Done():
return
case done = <-rp.trigger:
case <-after:
}

//'mute' the trigger channel so when `ipfs bitswap reprovide` is called
//a 'reprovider is already running' error is returned
// 'mute' the trigger channel so when `ipfs bitswap reprovide` is called
// a 'reprovider is already running' error is returned
unmute := rp.muteTrigger()

err := rp.Reprovide()
err := rp.reprovide(ctx)
if err != nil {
log.Debug(err)
}
Expand All @@ -73,13 +83,13 @@ func (rp *Reprovider) Run(tick time.Duration) {

unmute()

after = time.After(tick)
after = time.After(rp.tick)
}
}

// Reprovide registers all keys given by rp.keyProvider to libp2p content routing
func (rp *Reprovider) Reprovide() error {
keychan, err := rp.keyProvider(rp.ctx)
// reprovide registers all keys given by rp.keyProvider to libp2p content routing
func (rp *Reprovider) reprovide(ctx context.Context) error {
keychan, err := rp.keyProvider(ctx)
if err != nil {
return fmt.Errorf("failed to get key chan: %s", err)
}
Expand All @@ -90,7 +100,7 @@ func (rp *Reprovider) Reprovide() error {
continue
}
op := func() error {
err := rp.rsys.Provide(rp.ctx, c, true)
err := rp.rsys.Provide(ctx, c, true)
if err != nil {
log.Debugf("Failed to provide key: %s", err)
}
Expand Down Expand Up @@ -119,10 +129,12 @@ func (rp *Reprovider) Trigger(ctx context.Context) error {
}

select {
case <-rp.closing:
return errors.New("reprovider is closed")
case <-rp.ctx.Done():
return context.Canceled
return rp.ctx.Err()
case <-ctx.Done():
return context.Canceled
return ctx.Err()
case rp.trigger <- df:
<-progressCtx.Done()
return err
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package reprovide_test
package reprovide

import (
"context"
Expand All @@ -10,9 +10,7 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
mock "github.com/ipfs/go-ipfs-routing/mock"
pstore "github.com/libp2p/go-libp2p-peerstore"
testutil "github.com/libp2p/go-testutil"

. "github.com/ipfs/go-ipfs/exchange/reprovide"
"github.com/libp2p/go-testutil"
)

func TestReprovide(t *testing.T) {
Expand All @@ -36,8 +34,8 @@ func TestReprovide(t *testing.T) {
}

keyProvider := NewBlockstoreProvider(bstore)
reprov := NewReprovider(ctx, clA, keyProvider)
err = reprov.Reprovide()
reprov := NewReprovider(ctx, 0, clA, keyProvider)
err = reprov.reprovide(ctx)
if err != nil {
t.Fatal(err)
}
Expand Down