Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): protect MFS root in node with lock #8447

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/ipfs/pinmfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ type ipfsPinMFSNode struct {
}

func (x *ipfsPinMFSNode) RootNode() (ipld.Node, error) {
return x.node.FilesRoot.GetDirectory().GetNode()
// FIXME(BLOCKING): We're deploying this lock taking mechanism as simply as
// possible for now deferring the unlock but we should be more precise.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See/Similar to comments below. That's the best we can do now. Even though this code has no effect at all (because the root cannot change inside LockedFilesRoot), it still seems useful to make someone staring at the code to be puzzled and investigate what this lock is all about.

filesRoot := x.node.LockedFilesRoot.RLock()
defer x.node.LockedFilesRoot.RUnlock()
return filesRoot.GetDirectory().GetNode()
}

func (x *ipfsPinMFSNode) Identity() peer.ID {
Expand Down
183 changes: 106 additions & 77 deletions core/commands/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
gopath "path"
"sort"
"strings"
"time"

humanize "github.com/dustin/go-humanize"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/commands/cmdenv"
"github.com/ipfs/go-ipfs/core/node"

bservice "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -398,20 +400,23 @@ GC'ed.
return fmt.Errorf("cp: cannot get node from path %s: %s", src, err)
}

filesRoot := nd.LockedFilesRoot.Lock()
defer nd.LockedFilesRoot.Unlock()

if mkParents {
err := ensureContainingDirectoryExists(nd.FilesRoot, dst, prefix)
err := ensureContainingDirectoryExists(filesRoot, dst, prefix)
if err != nil {
return err
}
}

err = mfs.PutNode(nd.FilesRoot, dst, node)
err = mfs.PutNode(filesRoot, dst, node)
if err != nil {
return fmt.Errorf("cp: cannot put node in path %s: %s", dst, err)
}

if flush {
_, err := mfs.FlushPath(req.Context, nd.FilesRoot, dst)
_, err := mfs.FlushPath(req.Context, filesRoot, dst)
if err != nil {
return fmt.Errorf("cp: cannot flush the created file %s: %s", dst, err)
}
Expand All @@ -426,7 +431,9 @@ func getNodeFromPath(ctx context.Context, node *core.IpfsNode, api iface.CoreAPI
case strings.HasPrefix(p, "/ipfs/"):
return api.ResolveNode(ctx, path.New(p))
default:
fsn, err := mfs.Lookup(node.FilesRoot, p)
filesRoot := node.LockedFilesRoot.RLock()
defer node.LockedFilesRoot.RUnlock()
fsn, err := mfs.Lookup(filesRoot, p)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -491,7 +498,10 @@ Examples:
return err
}

fsn, err := mfs.Lookup(nd.FilesRoot, path)
filesRoot := nd.LockedFilesRoot.RLock()
defer nd.LockedFilesRoot.RUnlock()

fsn, err := mfs.Lookup(filesRoot, path)
if err != nil {
return err
}
Expand Down Expand Up @@ -611,7 +621,10 @@ Examples:
return err
}

fsn, err := mfs.Lookup(nd.FilesRoot, path)
filesRoot := nd.LockedFilesRoot.RLock()
defer nd.LockedFilesRoot.RUnlock()

fsn, err := mfs.Lookup(filesRoot, path)
if err != nil {
return err
}
Expand Down Expand Up @@ -706,9 +719,12 @@ Example:
return err
}

err = mfs.Mv(nd.FilesRoot, src, dst)
filesRoot := nd.LockedFilesRoot.Lock()
defer nd.LockedFilesRoot.Unlock()

err = mfs.Mv(filesRoot, src, dst)
if err == nil && flush {
_, err = mfs.FlushPath(req.Context, nd.FilesRoot, "/")
_, err = mfs.FlushPath(req.Context, filesRoot, "/")
}
return err
},
Expand Down Expand Up @@ -768,6 +784,9 @@ stat' on the file or any of its ancestors.
cmds.BoolOption(filesTruncateOptionName, "t", "Truncate the file to size zero before writing."),
cmds.Int64Option(filesCountOptionName, "n", "Maximum number of bytes to read."),
cmds.BoolOption(filesRawLeavesOptionName, "Use raw blocks for newly created leaf nodes. (experimental)"),
// FIXME(BLOCKING): Remove.
// Fake lock for manual testing.
cmds.IntOption("lock-time", "lt", "[TESTING] timeout to hold the MFS lock while copying").WithDefault(0),
cidVersionOption,
hashOption,
},
Expand Down Expand Up @@ -798,14 +817,21 @@ stat' on the file or any of its ancestors.
return fmt.Errorf("cannot have negative write offset")
}

filesRoot := nd.LockedFilesRoot.Lock()
defer nd.LockedFilesRoot.Unlock()
// FIXME(BLOCKING): Remove.
// Keep the hold for a fake, arbitrary amount of time to test it manually.
timeout, _ := req.Options["lock-time"].(int)
defer time.Sleep(time.Duration(timeout) * time.Second)

if mkParents {
err := ensureContainingDirectoryExists(nd.FilesRoot, path, prefix)
err := ensureContainingDirectoryExists(filesRoot, path, prefix)
if err != nil {
return err
}
}

fi, err := getFileHandle(nd.FilesRoot, path, create, prefix)
fi, err := getFileHandle(filesRoot, path, create, prefix)
if err != nil {
return err
}
Expand Down Expand Up @@ -904,7 +930,9 @@ Examples:
if err != nil {
return err
}
root := n.FilesRoot

root := n.LockedFilesRoot.Lock()
defer n.LockedFilesRoot.Unlock()

err = mfs.Mkdir(root, dirtomake, mfs.MkdirOpts{
Mkparents: dashp,
Expand Down Expand Up @@ -947,7 +975,10 @@ are run with the '--flush=false'.
path = req.Arguments[0]
}

n, err := mfs.FlushPath(req.Context, nd.FilesRoot, path)
filesRoot := nd.LockedFilesRoot.Lock()
defer nd.LockedFilesRoot.Unlock()

n, err := mfs.FlushPath(req.Context, filesRoot, path)
if err != nil {
return err
}
Expand Down Expand Up @@ -989,9 +1020,12 @@ Change the CID version or hash function of the root node of a given path.
return err
}

err = updatePath(nd.FilesRoot, path, prefix)
filesRoot := nd.LockedFilesRoot.Lock()
defer nd.LockedFilesRoot.Unlock()

err = updatePath(filesRoot, path, prefix)
if err == nil && flush {
_, err = mfs.FlushPath(req.Context, nd.FilesRoot, path)
_, err = mfs.FlushPath(req.Context, filesRoot, path)
}
return err
},
Expand Down Expand Up @@ -1052,74 +1086,13 @@ Remove files or directories.
for _, arg := range req.Arguments {
path, err := checkPath(arg)
if err != nil {
errs = append(errs, fmt.Errorf("%s: %w", arg, err))
continue
}

if path == "/" {
errs = append(errs, fmt.Errorf("%s: cannot delete root", path))
continue
}

// 'rm a/b/c/' will fail unless we trim the slash at the end
if path[len(path)-1] == '/' {
path = path[:len(path)-1]
}

dir, name := gopath.Split(path)

pdir, err := getParentDir(nd.FilesRoot, dir)
if err != nil {
if force && err == os.ErrNotExist {
continue
}
errs = append(errs, fmt.Errorf("%s: parent lookup: %w", path, err))
continue
}

if force {
err := pdir.Unlink(name)
if err != nil {
if err == os.ErrNotExist {
continue
}
errs = append(errs, fmt.Errorf("%s: %w", path, err))
continue
}
err = pdir.Flush()
if err != nil {
errs = append(errs, fmt.Errorf("%s: %w", path, err))
}
continue
}

// get child node by name, when the node is corrupted and nonexistent,
// it will return specific error.
child, err := pdir.Child(name)
if err != nil {
errs = append(errs, fmt.Errorf("%s: %w", path, err))
continue
}

switch child.(type) {
case *mfs.Directory:
if !dashr {
errs = append(errs, fmt.Errorf("%s is a directory, use -r to remove directories", path))
continue
}
}

err = pdir.Unlink(name)
if err != nil {
errs = append(errs, fmt.Errorf("%s: %w", path, err))
errs = append(errs, fmt.Errorf("%s is not a valid path: %w", arg, err))
continue
}

err = pdir.Flush()
if err != nil {
if err := removePath(nd.LockedFilesRoot, path, force, dashr); err != nil {
errs = append(errs, fmt.Errorf("%s: %w", path, err))
}
continue
}
if len(errs) > 0 {
for _, err = range errs {
Expand All @@ -1134,6 +1107,62 @@ Remove files or directories.
},
}

func removePath(lockedFilesRoot *node.LockedFilesRoot, path string, force bool, dashr bool) error {
if path == "/" {
return fmt.Errorf("cannot delete root")
}

// 'rm a/b/c/' will fail unless we trim the slash at the end
if path[len(path)-1] == '/' {
path = path[:len(path)-1]
}

dir, name := gopath.Split(path)

filesRoot := lockedFilesRoot.Lock()
defer lockedFilesRoot.Unlock()

pdir, err := getParentDir(filesRoot, dir)
if err != nil {
if force && err == os.ErrNotExist {
return nil
}
return err
}

if force {
err := pdir.Unlink(name)
if err != nil {
if err == os.ErrNotExist {
return nil
}
return err
}
return pdir.Flush()
}

// get child node by name, when the node is corrupted and nonexistent,
// it will return specific error.
child, err := pdir.Child(name)
if err != nil {
return err
}

switch child.(type) {
case *mfs.Directory:
if !dashr {
return fmt.Errorf("path is a directory, use -r to remove directories")
}
}

err = pdir.Unlink(name)
if err != nil {
return err
}

return pdir.Flush()
}

func getPrefixNew(req *cmds.Request) (cid.Builder, error) {
cidVer, cidVerSet := req.Options[filesCidVersionOptionName].(int)
hashFunStr, hashFunSet := req.Options[filesHashOptionName].(string)
Expand Down
13 changes: 11 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/ipfs/go-ipfs-provider"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
mfs "github.com/ipfs/go-mfs"
goprocess "github.com/jbenet/goprocess"
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
ic "github.com/libp2p/go-libp2p-core/crypto"
Expand Down Expand Up @@ -81,7 +80,17 @@ type IpfsNode struct {
UnixFSFetcherFactory fetcher.Factory `name:"unixfsFetcher"` // fetcher that interprets UnixFS data
Reporter *metrics.BandwidthCounter `optional:"true"`
Discovery mdns.Service `optional:"true"`
FilesRoot *mfs.Root
// FIXME(BLOCKING): Decided on this.
// Temporarily masking FilesRoot to make sure we're covering *all* MFS accesses.
// We should evaluate later if we either want to:
// * leave it in place for backward compatibility (this would be very risky
// given we would be exposing an unlocked version of the MFS root)
// * remove it completely (breaking interface) in favor of LockedFilesRoot;
// this would seem the cleaner option
// * some in-between like renaming LockedFilesRoot to the original FilesRoot
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would go for this option, because this will help someone using the new version of the code figure out what is going on. It does not look like there is any danger of old incorrect code compiling quietly.

// (which would still be breaking the API and could even be more confusing)
// FilesRoot *mfs.Root
LockedFilesRoot *node.LockedFilesRoot
RecordValidator record.Validator

// Online
Expand Down
8 changes: 6 additions & 2 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func BestEffortRoots(filesRoot *mfs.Root) ([]cid.Cid, error) {
}

func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
roots, err := BestEffortRoots(n.FilesRoot)
filesRoot := n.LockedFilesRoot.RLock()
defer n.LockedFilesRoot.RUnlock()
roots, err := BestEffortRoots(filesRoot)
if err != nil {
return err
}
Expand Down Expand Up @@ -148,7 +150,9 @@ func (e *MultiError) Error() string {
}

func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) <-chan gc.Result {
roots, err := BestEffortRoots(n.FilesRoot)
filesRoot := n.LockedFilesRoot.RLock()
defer n.LockedFilesRoot.RUnlock()
roots, err := BestEffortRoots(filesRoot)
if err != nil {
out := make(chan gc.Result)
out <- gc.Result{Error: err}
Expand Down
Loading