Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change pinning to happen in a callback #1274

Merged
merged 4 commits into from
May 30, 2015
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"

"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

cmds "github.com/ipfs/go-ipfs/commands"
files "github.com/ipfs/go-ipfs/commands/files"
Expand All @@ -16,6 +15,7 @@ import (
importer "github.com/ipfs/go-ipfs/importer"
"github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
ft "github.com/ipfs/go-ipfs/unixfs"
u "github.com/ipfs/go-ipfs/util"
)
Expand Down Expand Up @@ -113,12 +113,16 @@ remains to be implemented.
return
}

err = n.Pinning.Pin(context.Background(), rootnd, true)
rnk, err := rootnd.Key()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

mp := n.Pinning.GetManual()
mp.RemovePinWithMode(rnk, pin.Indirect)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this being removed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it is added below (for all files found) and then re-added here?

why not just not pin anything until the end? i.e. remove the call to pin below and only pin recursively here after adding?

is the worry that something will GC in-between, while adding? (if so, ok makes sense).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just not pin anything until the end?

thats what this PR is trying to avoid doing. in order to pin after the fact, you have to pull every single block in the dag back off the disk, and write the pin for it. This is really expensive when we get into the 50+GB range of add operations. (or in the 10MB+ range on my pi)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the worry that something will GC in-between, while adding? (if so, ok makes sense).

This too, a poorly timed gc could end really badly for us. (although its still not perfectly safe)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe mark and sweep is TRTTD

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Wed, May 27, 2015 at 11:42:36AM -0700, Jeromy Johnson wrote:

is the worry that something will GC in-between, while adding? (if
so, ok makes sense).

This too, a poorly timed gc could end really badly for us. (although
its still not perfectly safe)

I think the right approach here is to have transactional changes to
the pin tree (see also the commit sessions discussed in #1187).
Coupling that with something like Git's gc.pruneexpire to avoid
garbage-collecting recently-created objects (say, in the last week?)
would give us better safety here. It would also protect folks from
accidentally GC'ing recent objects they'd forgotten to tag.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coupling that with something like Git's gc.pruneexpire

that's a great solution. of course it comes from git.

i'm really concerned about safety here--

@whyrusleeping @tv42 -- what's the plan long term?

mp.PinWithMode(rnk, pin.Recursive)

err = n.Pinning.Flush()
if err != nil {
res.SetError(err, cmds.ErrNormal)
Expand Down Expand Up @@ -214,7 +218,12 @@ remains to be implemented.
}

func add(n *core.IpfsNode, reader io.Reader) (*dag.Node, error) {
node, err := importer.BuildDagFromReader(reader, n.DAG, nil, chunk.DefaultSplitter)
node, err := importer.BuildDagFromReader(
reader,
n.DAG,
chunk.DefaultSplitter,
importer.PinIndirectCB(n.Pinning.GetManual()),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -290,11 +299,13 @@ func addDir(n *core.IpfsNode, dir files.File, out chan interface{}, progress boo
return nil, err
}

_, err = n.DAG.Add(tree)
k, err := n.DAG.Add(tree)
if err != nil {
return nil, err
}

n.Pinning.GetManual().PinWithMode(k, pin.Indirect)

return tree, nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/corehttp/gateway_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (i *gatewayHandler) newDagFromReader(r io.Reader) (*dag.Node, error) {
// TODO(cryptix): change and remove this helper once PR1136 is merged
// return ufs.AddFromReader(i.node, r.Body)
return importer.BuildDagFromReader(
r, i.node.DAG, i.node.Pinning.GetManual(), chunk.DefaultSplitter)
r, i.node.DAG, chunk.DefaultSplitter, importer.BasicPinnerCB(i.node.Pinning.GetManual()))
}

// TODO(btc): break this apart into separate handlers using a more expressive muxer
Expand Down
33 changes: 18 additions & 15 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package coreunix

import (
"errors"
"io"
"io/ioutil"
"os"
Expand Down Expand Up @@ -29,15 +28,12 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) {
dagNode, err := importer.BuildDagFromReader(
r,
n.DAG,
n.Pinning.GetManual(), // Fix this interface
chunk.DefaultSplitter,
importer.BasicPinnerCB(n.Pinning.GetManual()),
)
if err != nil {
return "", err
}
if err := n.Pinning.Flush(); err != nil {
return "", err
}
k, err := dagNode.Key()
if err != nil {
return "", err
Expand All @@ -53,18 +49,28 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
return "", err
}
defer f.Close()

ff, err := files.NewSerialFile(root, f)
if err != nil {
return "", err
}

dagnode, err := addFile(n, ff)
if err != nil {
return "", err
}

k, err := dagnode.Key()
if err != nil {
return "", err
}

n.Pinning.GetManual().RemovePinWithMode(k, pin.Indirect)
err = n.Pinning.Flush()
if err != nil {
return "", err
}

return k.String(), nil
}

Expand All @@ -87,17 +93,14 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *merkle
}

func add(n *core.IpfsNode, reader io.Reader) (*merkledag.Node, error) {
mp, ok := n.Pinning.(pin.ManualPinner)
if !ok {
return nil, errors.New("invalid pinner type! expected manual pinner")
}

node, err := importer.BuildDagFromReader(reader, n.DAG, mp, chunk.DefaultSplitter)
if err != nil {
return nil, err
}
mp := n.Pinning.GetManual()

err = n.Pinning.Flush()
node, err := importer.BuildDagFromReader(
reader,
n.DAG,
chunk.DefaultSplitter,
importer.PinIndirectCB(mp),
)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestMetadata(t *testing.T) {
data := make([]byte, 1000)
u.NewTimeSeededRand().Read(data)
r := bytes.NewReader(data)
nd, err := importer.BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter)
nd, err := importer.BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion fuse/readonly/ipfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func randObj(t *testing.T, nd *core.IpfsNode, size int64) (*dag.Node, []byte) {
buf := make([]byte, size)
u.NewTimeSeededRand().Read(buf)
read := bytes.NewReader(buf)
obj, err := importer.BuildTrickleDagFromReader(read, nd.DAG, nil, chunk.DefaultSplitter)
obj, err := importer.BuildTrickleDagFromReader(read, nd.DAG, chunk.DefaultSplitter, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
31 changes: 21 additions & 10 deletions importer/helpers/dagbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import (
"github.com/ipfs/go-ipfs/pin"
)

// NodeCB is callback function for dag generation
// the `root` flag signifies whether or not this is
// the root of a dag.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean this is weird, right, because every node is the root of a dag.

maybe

the `last` flag signifies whether or not this is the last 
(top-most root) node being added. useful for things like
only pinning the first node recursively.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

type NodeCB func(node *dag.Node, root bool) error

var nilFunc NodeCB = func(_ *dag.Node, _ bool) error { return nil }

// DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type DagBuilderHelper struct {
Expand All @@ -13,6 +20,7 @@ type DagBuilderHelper struct {
in <-chan []byte
nextData []byte // the next item to return.
maxlinks int
ncb NodeCB
}

type DagBuilderParams struct {
Expand All @@ -22,18 +30,23 @@ type DagBuilderParams struct {
// DAGService to write blocks to (required)
Dagserv dag.DAGService

// Pinner to use for pinning files (optionally nil)
Pinner pin.ManualPinner
// Callback for each block added
NodeCB NodeCB
}

// Generate a new DagBuilderHelper from the given params, using 'in' as a
// data source
func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper {
ncb := dbp.NodeCB
if ncb == nil {
ncb = nilFunc
}

return &DagBuilderHelper{
dserv: dbp.Dagserv,
mp: dbp.Pinner,
in: in,
maxlinks: dbp.Maxlinks,
ncb: ncb,
}
}

Expand Down Expand Up @@ -125,17 +138,15 @@ func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) {
return nil, err
}

key, err := db.dserv.Add(dn)
_, err = db.dserv.Add(dn)
if err != nil {
return nil, err
}

if db.mp != nil {
db.mp.PinWithMode(key, pin.Recursive)
err := db.mp.Flush()
if err != nil {
return nil, err
}
// node callback
err = db.ncb(dn, true)
if err != nil {
return nil, err
}

return dn, nil
Expand Down
7 changes: 4 additions & 3 deletions importer/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error {
return err
}

childkey, err := db.dserv.Add(childnode)
_, err = db.dserv.Add(childnode)
if err != nil {
return err
}

// Pin the child node indirectly
if db.mp != nil {
db.mp.PinWithMode(childkey, pin.Indirect)
err = db.ncb(childnode, false)
if err != nil {
return err
}

return nil
Expand Down
43 changes: 36 additions & 7 deletions importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
trickle "github.com/ipfs/go-ipfs/importer/trickle"
dag "github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/util"
u "github.com/ipfs/go-ipfs/util"
)

var log = util.Logger("importer")
var log = u.Logger("importer")

// Builds a DAG from the given file, writing created blocks to disk as they are
// created
Expand All @@ -36,31 +36,60 @@ func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*da
}
defer f.Close()

return BuildDagFromReader(f, ds, mp, chunk.DefaultSplitter)
return BuildDagFromReader(f, ds, chunk.DefaultSplitter, BasicPinnerCB(mp))
}

func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
func BuildDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, ncb h.NodeCB) (*dag.Node, error) {
// Start the splitter
blkch := spl.Split(r)

dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
Pinner: mp,
NodeCB: ncb,
}

return bal.BalancedLayout(dbp.New(blkch))
}

func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, ncb h.NodeCB) (*dag.Node, error) {
// Start the splitter
blkch := spl.Split(r)

dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
Pinner: mp,
NodeCB: ncb,
}

return trickle.TrickleLayout(dbp.New(blkch))
}

func BasicPinnerCB(p pin.ManualPinner) h.NodeCB {
return func(n *dag.Node, root bool) error {
k, err := n.Key()
if err != nil {
return err
}

if root {
p.PinWithMode(k, pin.Recursive)
return p.Flush()
} else {
p.PinWithMode(k, pin.Indirect)
return nil
}
}
}

func PinIndirectCB(p pin.ManualPinner) h.NodeCB {
return func(n *dag.Node, root bool) error {
k, err := n.Key()
if err != nil {
return err
}

p.PinWithMode(k, pin.Indirect)
return nil
}
}
6 changes: 3 additions & 3 deletions importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) {
ds := mdtest.Mock(t)
r := io.LimitReader(u.NewTimeSeededRand(), size)
nd, err := BuildDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize})
nd, err := BuildDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -27,7 +27,7 @@ func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGSe
func getTrickleDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) {
ds := mdtest.Mock(t)
r := io.LimitReader(u.NewTimeSeededRand(), size)
nd, err := BuildTrickleDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize})
nd, err := BuildTrickleDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -40,7 +40,7 @@ func TestBalancedDag(t *testing.T) {
u.NewTimeSeededRand().Read(buf)
r := bytes.NewReader(buf)

nd, err := BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter)
nd, err := BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {

spl := &chunk.SizeSplitter{512}

root, err := imp.BuildDagFromReader(read, dagservs[0], nil, spl)
root, err := imp.BuildDagFromReader(read, dagservs[0], spl, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion unixfs/mod/dagmodifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

imp "github.com/ipfs/go-ipfs/importer"
chunk "github.com/ipfs/go-ipfs/importer/chunk"
help "github.com/ipfs/go-ipfs/importer/helpers"
trickle "github.com/ipfs/go-ipfs/importer/trickle"
Expand Down Expand Up @@ -308,7 +309,7 @@ func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.No
dbp := &help.DagBuilderParams{
Dagserv: dm.dagserv,
Maxlinks: help.DefaultLinksPerBlock,
Pinner: dm.mp,
NodeCB: imp.BasicPinnerCB(dm.mp),
}

return trickle.TrickleAppend(node, dbp.New(blks))
Expand Down
Loading