Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

feat!: add and connect missing context, remove RemovePinWithMode #23

Merged
merged 2 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 68 additions & 97 deletions dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import (
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
ipfspinner "github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-pinner/dsindex"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-merkledag/dagutils"
"github.com/polydawn/refmt/cbor"
"github.com/polydawn/refmt/obj/atlas"

ipfspinner "github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-pinner/dsindex"
)

const (
Expand Down Expand Up @@ -179,23 +180,30 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
return err
}

c := node.Cid()
if recurse {
return p.doPinRecursive(ctx, node.Cid(), true)
} else {
return p.doPinDirect(ctx, node.Cid())
}
}

func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool) error {
cidKey := c.KeyString()

p.lock.Lock()
defer p.lock.Unlock()

if recurse {
found, err := p.cidRIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
if found {
return nil
}
found, err := p.cidRIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
if found {
return nil
}

dirtyBefore := p.dirty
dirtyBefore := p.dirty

if fetch {
// temporary unlock to fetch the entire graph
p.lock.Unlock()
// Fetch graph starting at node identified by cid
Expand All @@ -204,54 +212,63 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
if err != nil {
return err
}
}

// If autosyncing, sync dag service before making any change to pins
err = p.flushDagService(ctx, false)
if err != nil {
return err
}

// Only look again if something has changed.
if p.dirty != dirtyBefore {
found, err = p.cidRIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
if found {
return nil
}
}
// If autosyncing, sync dag service before making any change to pins
err = p.flushDagService(ctx, false)
if err != nil {
return err
}

// TODO: remove this to support multiple pins per CID
found, err = p.cidDIndex.HasAny(ctx, cidKey)
// Only look again if something has changed.
if p.dirty != dirtyBefore {
found, err = p.cidRIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
if found {
_, err = p.removePinsForCid(ctx, c, ipfspinner.Direct)
if err != nil {
return err
}
return nil
}
}

_, err = p.addPin(ctx, c, ipfspinner.Recursive, "")
if err != nil {
return err
}
} else {
found, err := p.cidRIndex.HasAny(ctx, cidKey)
// TODO: remove this to support multiple pins per CID
found, err = p.cidDIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
if found {
_, err = p.removePinsForCid(ctx, c, ipfspinner.Direct)
if err != nil {
return err
}
if found {
return fmt.Errorf("%s already pinned recursively", c.String())
}
}

_, err = p.addPin(ctx, c, ipfspinner.Direct, "")
if err != nil {
return err
}
_, err = p.addPin(ctx, c, ipfspinner.Recursive, "")
if err != nil {
return err
}
return p.flushPins(ctx, false)
}

func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid) error {
cidKey := c.KeyString()

p.lock.Lock()
defer p.lock.Unlock()

found, err := p.cidRIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
if found {
return fmt.Errorf("%s already pinned recursively", c.String())
}

_, err = p.addPin(ctx, c, ipfspinner.Direct, "")
if err != nil {
return err
}

return p.flushPins(ctx, false)
}

Expand Down Expand Up @@ -555,35 +572,6 @@ func (p *pinner) CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]ipfspinn
return pinned, nil
}

// RemovePinWithMode is for manually editing the pin structure.
// Use with care! If used improperly, garbage collection may not
// be successful.
func (p *pinner) RemovePinWithMode(c cid.Cid, mode ipfspinner.Mode) {
ctx := context.TODO()
// Check cache to see if CID is pinned
switch mode {
case ipfspinner.Direct, ipfspinner.Recursive:
default:
// programmer error, panic OK
panic("unrecognized pin type")
}

p.lock.Lock()
defer p.lock.Unlock()

removed, err := p.removePinsForCid(ctx, c, mode)
if err != nil {
log.Error("cound not remove pins: %s", err)
return
}
if !removed {
return
}
if err = p.flushPins(ctx, false); err != nil {
log.Error("cound not remove pins: %s", err)
}
}

// removePinsForCid removes all pins for a cid that has the specified mode.
// Returns true if any pins, and all corresponding CID index entries, were
// removed. Otherwise, returns false.
Expand Down Expand Up @@ -826,32 +814,15 @@ func (p *pinner) Flush(ctx context.Context) error {

// PinWithMode allows the user to have fine grained control over pin
// counts
func (p *pinner) PinWithMode(c cid.Cid, mode ipfspinner.Mode) {
ctx := context.TODO()

p.lock.Lock()
defer p.lock.Unlock()

func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) error {
// TODO: remove his to support multiple pins per CID
switch mode {
case ipfspinner.Recursive:
if has, _ := p.cidRIndex.HasAny(ctx, c.KeyString()); has {
return // already a recursive pin for this CID
}
return p.doPinRecursive(ctx, c, false)
case ipfspinner.Direct:
if has, _ := p.cidDIndex.HasAny(ctx, c.KeyString()); has {
return // already a direct pin for this CID
}
return p.doPinDirect(ctx, c)
default:
panic("unrecognized pin mode")
}

_, err := p.addPin(ctx, c, mode, "")
if err != nil {
return
}
if err = p.flushPins(ctx, false); err != nil {
log.Errorf("failed to create %s pin: %s", mode, err)
return fmt.Errorf("unrecognized pin mode")
}
}

Expand Down
44 changes: 3 additions & 41 deletions dspinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
lds "github.com/ipfs/go-ds-leveldb"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipfspin "github.com/ipfs/go-ipfs-pinner"
util "github.com/ipfs/go-ipfs-util"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"

ipfspin "github.com/ipfs/go-ipfs-pinner"
)

var rand = util.NewTimeSeededRand()
Expand Down Expand Up @@ -375,45 +376,6 @@ func TestAddLoadPin(t *testing.T) {
}
}

func TestRemovePinWithMode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))

dserv := mdag.NewDAGService(bserv)

p, err := New(ctx, dstore, dserv)
if err != nil {
t.Fatal(err)
}

a, ak := randNode()
err = dserv.Add(ctx, a)
if err != nil {
panic(err)
}

err = p.Pin(ctx, a, false)
if err != nil {
t.Fatal(err)
}

ok, err := p.removePinsForCid(ctx, ak, ipfspin.Recursive)
if err != nil {
t.Fatal(err)
}
if ok {
t.Error("pin should not have been removed")
}

p.RemovePinWithMode(ak, ipfspin.Direct)

assertUnpinned(t, p, ak, "pin was not removed")
}

func TestIsPinnedLookup(t *testing.T) {
// Test that lookups work in pins which share
// the same branches. For that construct this tree:
Expand Down Expand Up @@ -523,7 +485,7 @@ func TestFlush(t *testing.T) {
}
_, k := randNode()

p.PinWithMode(k, ipfspin.Recursive)
p.PinWithMode(ctx, k, ipfspin.Recursive)
if err = p.Flush(ctx); err != nil {
t.Fatal(err)
}
Expand Down
9 changes: 3 additions & 6 deletions pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type Pinner interface {
IsPinnedWithType(ctx context.Context, c cid.Cid, mode Mode) (string, bool, error)

// Pin the given node, optionally recursively.
// Pin will make sure that the given node and its children if recursive is set
// are stored locally.
Pin(ctx context.Context, node ipld.Node, recursive bool) error

// Unpin the given cid. If recursive is true, removes either a recursive or
Expand All @@ -111,12 +113,7 @@ type Pinner interface {
// PinWithMode is for manually editing the pin structure. Use with
// care! If used improperly, garbage collection may not be
// successful.
PinWithMode(cid.Cid, Mode)

// RemovePinWithMode is for manually editing the pin structure.
// Use with care! If used improperly, garbage collection may not
// be successful.
RemovePinWithMode(cid.Cid, Mode)
PinWithMode(context.Context, cid.Cid, Mode) error

// Flush writes the pin state to the backing datastore
Flush(ctx context.Context) error
Expand Down