From db228e1054f1b26b8d45eb860ef4106303f89e4f Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 19 Aug 2016 17:45:49 -0700 Subject: [PATCH] pin: use separate dagservice for storing pinsets License: MIT Signed-off-by: Jeromy --- core/builder.go | 6 ++++-- merkledag/merkledag_test.go | 25 ++----------------------- pin/pin.go | 23 ++++++++++++----------- pin/pin_test.go | 10 +++++----- 4 files changed, 23 insertions(+), 41 deletions(-) diff --git a/core/builder.go b/core/builder.go index db282748a5e..8de64b8804a 100644 --- a/core/builder.go +++ b/core/builder.go @@ -171,13 +171,15 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { n.Blocks = bserv.New(n.Blockstore, n.Exchange) n.DAG = dag.NewDAGService(n.Blocks) - n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG) + + internalDag := dag.NewDAGService(bserv.New(n.Blockstore, offline.Exchange(n.Blockstore))) + n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG, internalDag) if err != nil { // TODO: we should move towards only running 'NewPinner' explicity on // node init instead of implicitly here as a result of the pinner keys // not being found in the datastore. // this is kinda sketchy and could cause data loss - n.Pinning = pin.NewPinner(n.Repo.Datastore(), n.DAG) + n.Pinning = pin.NewPinner(n.Repo.Datastore(), n.DAG, internalDag) } n.Resolver = &path.Resolver{DAG: n.DAG} diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index dcf9ced1cab..05ba260f10a 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -10,7 +10,6 @@ import ( "sync" "testing" - bstore "github.com/ipfs/go-ipfs/blocks/blockstore" key "github.com/ipfs/go-ipfs/blocks/key" bserv "github.com/ipfs/go-ipfs/blockservice" bstest "github.com/ipfs/go-ipfs/blockservice/test" @@ -19,31 +18,11 @@ import ( chunk "github.com/ipfs/go-ipfs/importer/chunk" . "github.com/ipfs/go-ipfs/merkledag" dstest "github.com/ipfs/go-ipfs/merkledag/test" - "github.com/ipfs/go-ipfs/pin" uio "github.com/ipfs/go-ipfs/unixfs/io" - ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore" - dssync "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync" u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util" "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ) -type dagservAndPinner struct { - ds DAGService - mp pin.Pinner -} - -func getDagservAndPinner(t *testing.T) dagservAndPinner { - db := dssync.MutexWrap(ds.NewMapDatastore()) - bs := bstore.NewBlockstore(db) - blockserv := bserv.New(bs, offline.Exchange(bs)) - dserv := NewDAGService(blockserv) - mpin := pin.NewPinner(db, dserv) - return dagservAndPinner{ - ds: dserv, - mp: mpin, - } -} - func TestNode(t *testing.T) { n1 := NodeWithData([]byte("beep")) @@ -254,7 +233,7 @@ func TestEmptyKey(t *testing.T) { } func TestCantGet(t *testing.T) { - dsp := getDagservAndPinner(t) + ds := dstest.Mock() a := NodeWithData([]byte("A")) k, err := a.Key() @@ -262,7 +241,7 @@ func TestCantGet(t *testing.T) { t.Fatal(err) } - _, err = dsp.ds.Get(context.Background(), k) + _, err = ds.Get(context.Background(), k) if !strings.Contains(err.Error(), "not found") { t.Fatal("expected err not found, got: ", err) } diff --git a/pin/pin.go b/pin/pin.go index 49c5a8dd1e0..2628359cbc5 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -110,15 +110,14 @@ type pinner struct { // not delete them. internalPin map[key.Key]struct{} dserv mdag.DAGService + internal mdag.DAGService // dagservice used to store internal objects dstore ds.Datastore } // NewPinner creates a new pinner using the given datastore as a backend -func NewPinner(dstore ds.Datastore, serv mdag.DAGService) Pinner { +func NewPinner(dstore ds.Datastore, serv, internal mdag.DAGService) Pinner { - // Load set from given datastore... rcset := set.NewSimpleBlockSet() - dirset := set.NewSimpleBlockSet() return &pinner{ @@ -126,6 +125,7 @@ func NewPinner(dstore ds.Datastore, serv mdag.DAGService) Pinner { directPin: dirset, dserv: serv, dstore: dstore, + internal: internal, } } @@ -344,7 +344,7 @@ func (p *pinner) RemovePinWithMode(key key.Key, mode PinMode) { } // LoadPinner loads a pinner and its keysets from the given datastore -func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) { +func LoadPinner(d ds.Datastore, dserv, internal mdag.DAGService) (Pinner, error) { p := new(pinner) rootKeyI, err := d.Get(pinDatastoreKey) @@ -361,7 +361,7 @@ func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) defer cancel() - root, err := dserv.Get(ctx, rootKey) + root, err := internal.Get(ctx, rootKey) if err != nil { return nil, fmt.Errorf("cannot find pinning root object: %v", err) } @@ -374,7 +374,7 @@ func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) { } { // load recursive set - recurseKeys, err := loadSet(ctx, dserv, root, linkRecursive, recordInternal) + recurseKeys, err := loadSet(ctx, internal, root, linkRecursive, recordInternal) if err != nil { return nil, fmt.Errorf("cannot load recursive pins: %v", err) } @@ -382,7 +382,7 @@ func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) { } { // load direct set - directKeys, err := loadSet(ctx, dserv, root, linkDirect, recordInternal) + directKeys, err := loadSet(ctx, internal, root, linkDirect, recordInternal) if err != nil { return nil, fmt.Errorf("cannot load direct pins: %v", err) } @@ -394,6 +394,7 @@ func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) { // assign services p.dserv = dserv p.dstore = d + p.internal = internal return p, nil } @@ -422,7 +423,7 @@ func (p *pinner) Flush() error { root := &mdag.Node{} { - n, err := storeSet(ctx, p.dserv, p.directPin.GetKeys(), recordInternal) + n, err := storeSet(ctx, p.internal, p.directPin.GetKeys(), recordInternal) if err != nil { return err } @@ -432,7 +433,7 @@ func (p *pinner) Flush() error { } { - n, err := storeSet(ctx, p.dserv, p.recursePin.GetKeys(), recordInternal) + n, err := storeSet(ctx, p.internal, p.recursePin.GetKeys(), recordInternal) if err != nil { return err } @@ -442,12 +443,12 @@ func (p *pinner) Flush() error { } // add the empty node, its referenced by the pin sets but never created - _, err := p.dserv.Add(new(mdag.Node)) + _, err := p.internal.Add(new(mdag.Node)) if err != nil { return err } - k, err := p.dserv.Add(root) + k, err := p.internal.Add(root) if err != nil { return err } diff --git a/pin/pin_test.go b/pin/pin_test.go index 8e4cfd8a8b5..d6496618eeb 100644 --- a/pin/pin_test.go +++ b/pin/pin_test.go @@ -45,7 +45,7 @@ func TestPinnerBasic(t *testing.T) { dserv := mdag.NewDAGService(bserv) // TODO does pinner need to share datastore with blockservice? - p := NewPinner(dstore, dserv) + p := NewPinner(dstore, dserv, dserv) a, ak := randNode() _, err := dserv.Add(a) @@ -133,7 +133,7 @@ func TestPinnerBasic(t *testing.T) { t.Fatal(err) } - np, err := LoadPinner(dstore, dserv) + np, err := LoadPinner(dstore, dserv, dserv) if err != nil { t.Fatal(err) } @@ -154,7 +154,7 @@ func TestDuplicateSemantics(t *testing.T) { dserv := mdag.NewDAGService(bserv) // TODO does pinner need to share datastore with blockservice? - p := NewPinner(dstore, dserv) + p := NewPinner(dstore, dserv, dserv) a, _ := randNode() _, err := dserv.Add(a) @@ -187,7 +187,7 @@ func TestFlush(t *testing.T) { bserv := bs.New(bstore, offline.Exchange(bstore)) dserv := mdag.NewDAGService(bserv) - p := NewPinner(dstore, dserv) + p := NewPinner(dstore, dserv, dserv) _, k := randNode() p.PinWithMode(k, Recursive) @@ -204,7 +204,7 @@ func TestPinRecursiveFail(t *testing.T) { bserv := bs.New(bstore, offline.Exchange(bstore)) dserv := mdag.NewDAGService(bserv) - p := NewPinner(dstore, dserv) + p := NewPinner(dstore, dserv, dserv) a, _ := randNode() b, _ := randNode()