From 367aa905940820fe2cd98e9553e9c59e247092db Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Tue, 30 Nov 2021 15:35:32 -0300 Subject: [PATCH 1/2] chore(cmds): encapsulate ipfs rm logic in another function --- core/commands/files.go | 118 +++++++++++++++++++---------------------- 1 file changed, 55 insertions(+), 63 deletions(-) diff --git a/core/commands/files.go b/core/commands/files.go index 3f7e3e6b9a9..a0adc754f9a 100644 --- a/core/commands/files.go +++ b/core/commands/files.go @@ -1052,74 +1052,13 @@ Remove files or directories. for _, arg := range req.Arguments { path, err := checkPath(arg) if err != nil { - errs = append(errs, fmt.Errorf("%s: %w", arg, err)) + errs = append(errs, fmt.Errorf("%s is not a valid path: %w", arg, err)) continue } - if path == "/" { - errs = append(errs, fmt.Errorf("%s: cannot delete root", path)) - continue - } - - // 'rm a/b/c/' will fail unless we trim the slash at the end - if path[len(path)-1] == '/' { - path = path[:len(path)-1] - } - - dir, name := gopath.Split(path) - - pdir, err := getParentDir(nd.FilesRoot, dir) - if err != nil { - if force && err == os.ErrNotExist { - continue - } - errs = append(errs, fmt.Errorf("%s: parent lookup: %w", path, err)) - continue - } - - if force { - err := pdir.Unlink(name) - if err != nil { - if err == os.ErrNotExist { - continue - } - errs = append(errs, fmt.Errorf("%s: %w", path, err)) - continue - } - err = pdir.Flush() - if err != nil { - errs = append(errs, fmt.Errorf("%s: %w", path, err)) - } - continue - } - - // get child node by name, when the node is corrupted and nonexistent, - // it will return specific error. - child, err := pdir.Child(name) - if err != nil { - errs = append(errs, fmt.Errorf("%s: %w", path, err)) - continue - } - - switch child.(type) { - case *mfs.Directory: - if !dashr { - errs = append(errs, fmt.Errorf("%s is a directory, use -r to remove directories", path)) - continue - } - } - - err = pdir.Unlink(name) - if err != nil { - errs = append(errs, fmt.Errorf("%s: %w", path, err)) - continue - } - - err = pdir.Flush() - if err != nil { + if err := removePath(nd.FilesRoot, path, force, dashr); err != nil { errs = append(errs, fmt.Errorf("%s: %w", path, err)) } - continue } if len(errs) > 0 { for _, err = range errs { @@ -1134,6 +1073,59 @@ Remove files or directories. }, } +func removePath(filesRoot *mfs.Root, path string, force bool, dashr bool) error { + if path == "/" { + return fmt.Errorf("cannot delete root") + } + + // 'rm a/b/c/' will fail unless we trim the slash at the end + if path[len(path)-1] == '/' { + path = path[:len(path)-1] + } + + dir, name := gopath.Split(path) + + pdir, err := getParentDir(filesRoot, dir) + if err != nil { + if force && err == os.ErrNotExist { + return nil + } + return err + } + + if force { + err := pdir.Unlink(name) + if err != nil { + if err == os.ErrNotExist { + return nil + } + return err + } + return pdir.Flush() + } + + // get child node by name, when the node is corrupted and nonexistent, + // it will return specific error. + child, err := pdir.Child(name) + if err != nil { + return err + } + + switch child.(type) { + case *mfs.Directory: + if !dashr { + return fmt.Errorf("path is a directory, use -r to remove directories") + } + } + + err = pdir.Unlink(name) + if err != nil { + return err + } + + return pdir.Flush() +} + func getPrefixNew(req *cmds.Request) (cid.Builder, error) { cidVer, cidVerSet := req.Options[filesCidVersionOptionName].(int) hashFunStr, hashFunSet := req.Options[filesHashOptionName].(string) From d0e05959f731cb8900dbdc44ff0654f6508cbda0 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Mon, 20 Sep 2021 09:14:31 -0300 Subject: [PATCH 2/2] feat(core): protect MFS root in node with lock --- cmd/ipfs/pinmfs.go | 6 +++- core/commands/files.go | 69 ++++++++++++++++++++++++++++++++---------- core/core.go | 13 ++++++-- core/corerepo/gc.go | 8 +++-- core/node/core.go | 52 +++++++++++++++++++++++++++++++ core/node/groups.go | 1 + test-mfs-lock.bash | 13 ++++++++ 7 files changed, 141 insertions(+), 21 deletions(-) create mode 100755 test-mfs-lock.bash diff --git a/cmd/ipfs/pinmfs.go b/cmd/ipfs/pinmfs.go index 8ea9d2dcc8e..93b5afd90da 100644 --- a/cmd/ipfs/pinmfs.go +++ b/cmd/ipfs/pinmfs.go @@ -50,7 +50,11 @@ type ipfsPinMFSNode struct { } func (x *ipfsPinMFSNode) RootNode() (ipld.Node, error) { - return x.node.FilesRoot.GetDirectory().GetNode() + // FIXME(BLOCKING): We're deploying this lock taking mechanism as simply as + // possible for now deferring the unlock but we should be more precise. + filesRoot := x.node.LockedFilesRoot.RLock() + defer x.node.LockedFilesRoot.RUnlock() + return filesRoot.GetDirectory().GetNode() } func (x *ipfsPinMFSNode) Identity() peer.ID { diff --git a/core/commands/files.go b/core/commands/files.go index a0adc754f9a..5c8ead5a7c9 100644 --- a/core/commands/files.go +++ b/core/commands/files.go @@ -9,10 +9,12 @@ import ( gopath "path" "sort" "strings" + "time" humanize "github.com/dustin/go-humanize" "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/go-ipfs/core/node" bservice "github.com/ipfs/go-blockservice" cid "github.com/ipfs/go-cid" @@ -398,20 +400,23 @@ GC'ed. return fmt.Errorf("cp: cannot get node from path %s: %s", src, err) } + filesRoot := nd.LockedFilesRoot.Lock() + defer nd.LockedFilesRoot.Unlock() + if mkParents { - err := ensureContainingDirectoryExists(nd.FilesRoot, dst, prefix) + err := ensureContainingDirectoryExists(filesRoot, dst, prefix) if err != nil { return err } } - err = mfs.PutNode(nd.FilesRoot, dst, node) + err = mfs.PutNode(filesRoot, dst, node) if err != nil { return fmt.Errorf("cp: cannot put node in path %s: %s", dst, err) } if flush { - _, err := mfs.FlushPath(req.Context, nd.FilesRoot, dst) + _, err := mfs.FlushPath(req.Context, filesRoot, dst) if err != nil { return fmt.Errorf("cp: cannot flush the created file %s: %s", dst, err) } @@ -426,7 +431,9 @@ func getNodeFromPath(ctx context.Context, node *core.IpfsNode, api iface.CoreAPI case strings.HasPrefix(p, "/ipfs/"): return api.ResolveNode(ctx, path.New(p)) default: - fsn, err := mfs.Lookup(node.FilesRoot, p) + filesRoot := node.LockedFilesRoot.RLock() + defer node.LockedFilesRoot.RUnlock() + fsn, err := mfs.Lookup(filesRoot, p) if err != nil { return nil, err } @@ -491,7 +498,10 @@ Examples: return err } - fsn, err := mfs.Lookup(nd.FilesRoot, path) + filesRoot := nd.LockedFilesRoot.RLock() + defer nd.LockedFilesRoot.RUnlock() + + fsn, err := mfs.Lookup(filesRoot, path) if err != nil { return err } @@ -611,7 +621,10 @@ Examples: return err } - fsn, err := mfs.Lookup(nd.FilesRoot, path) + filesRoot := nd.LockedFilesRoot.RLock() + defer nd.LockedFilesRoot.RUnlock() + + fsn, err := mfs.Lookup(filesRoot, path) if err != nil { return err } @@ -706,9 +719,12 @@ Example: return err } - err = mfs.Mv(nd.FilesRoot, src, dst) + filesRoot := nd.LockedFilesRoot.Lock() + defer nd.LockedFilesRoot.Unlock() + + err = mfs.Mv(filesRoot, src, dst) if err == nil && flush { - _, err = mfs.FlushPath(req.Context, nd.FilesRoot, "/") + _, err = mfs.FlushPath(req.Context, filesRoot, "/") } return err }, @@ -768,6 +784,9 @@ stat' on the file or any of its ancestors. cmds.BoolOption(filesTruncateOptionName, "t", "Truncate the file to size zero before writing."), cmds.Int64Option(filesCountOptionName, "n", "Maximum number of bytes to read."), cmds.BoolOption(filesRawLeavesOptionName, "Use raw blocks for newly created leaf nodes. (experimental)"), + // FIXME(BLOCKING): Remove. + // Fake lock for manual testing. + cmds.IntOption("lock-time", "lt", "[TESTING] timeout to hold the MFS lock while copying").WithDefault(0), cidVersionOption, hashOption, }, @@ -798,14 +817,21 @@ stat' on the file or any of its ancestors. return fmt.Errorf("cannot have negative write offset") } + filesRoot := nd.LockedFilesRoot.Lock() + defer nd.LockedFilesRoot.Unlock() + // FIXME(BLOCKING): Remove. + // Keep the hold for a fake, arbitrary amount of time to test it manually. + timeout, _ := req.Options["lock-time"].(int) + defer time.Sleep(time.Duration(timeout) * time.Second) + if mkParents { - err := ensureContainingDirectoryExists(nd.FilesRoot, path, prefix) + err := ensureContainingDirectoryExists(filesRoot, path, prefix) if err != nil { return err } } - fi, err := getFileHandle(nd.FilesRoot, path, create, prefix) + fi, err := getFileHandle(filesRoot, path, create, prefix) if err != nil { return err } @@ -904,7 +930,9 @@ Examples: if err != nil { return err } - root := n.FilesRoot + + root := n.LockedFilesRoot.Lock() + defer n.LockedFilesRoot.Unlock() err = mfs.Mkdir(root, dirtomake, mfs.MkdirOpts{ Mkparents: dashp, @@ -947,7 +975,10 @@ are run with the '--flush=false'. path = req.Arguments[0] } - n, err := mfs.FlushPath(req.Context, nd.FilesRoot, path) + filesRoot := nd.LockedFilesRoot.Lock() + defer nd.LockedFilesRoot.Unlock() + + n, err := mfs.FlushPath(req.Context, filesRoot, path) if err != nil { return err } @@ -989,9 +1020,12 @@ Change the CID version or hash function of the root node of a given path. return err } - err = updatePath(nd.FilesRoot, path, prefix) + filesRoot := nd.LockedFilesRoot.Lock() + defer nd.LockedFilesRoot.Unlock() + + err = updatePath(filesRoot, path, prefix) if err == nil && flush { - _, err = mfs.FlushPath(req.Context, nd.FilesRoot, path) + _, err = mfs.FlushPath(req.Context, filesRoot, path) } return err }, @@ -1056,7 +1090,7 @@ Remove files or directories. continue } - if err := removePath(nd.FilesRoot, path, force, dashr); err != nil { + if err := removePath(nd.LockedFilesRoot, path, force, dashr); err != nil { errs = append(errs, fmt.Errorf("%s: %w", path, err)) } } @@ -1073,7 +1107,7 @@ Remove files or directories. }, } -func removePath(filesRoot *mfs.Root, path string, force bool, dashr bool) error { +func removePath(lockedFilesRoot *node.LockedFilesRoot, path string, force bool, dashr bool) error { if path == "/" { return fmt.Errorf("cannot delete root") } @@ -1085,6 +1119,9 @@ func removePath(filesRoot *mfs.Root, path string, force bool, dashr bool) error dir, name := gopath.Split(path) + filesRoot := lockedFilesRoot.Lock() + defer lockedFilesRoot.Unlock() + pdir, err := getParentDir(filesRoot, dir) if err != nil { if force && err == os.ErrNotExist { diff --git a/core/core.go b/core/core.go index 888d3d78013..32d75fa64fb 100644 --- a/core/core.go +++ b/core/core.go @@ -24,7 +24,6 @@ import ( "github.com/ipfs/go-ipfs-provider" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" - mfs "github.com/ipfs/go-mfs" goprocess "github.com/jbenet/goprocess" connmgr "github.com/libp2p/go-libp2p-core/connmgr" ic "github.com/libp2p/go-libp2p-core/crypto" @@ -81,7 +80,17 @@ type IpfsNode struct { UnixFSFetcherFactory fetcher.Factory `name:"unixfsFetcher"` // fetcher that interprets UnixFS data Reporter *metrics.BandwidthCounter `optional:"true"` Discovery mdns.Service `optional:"true"` - FilesRoot *mfs.Root + // FIXME(BLOCKING): Decided on this. + // Temporarily masking FilesRoot to make sure we're covering *all* MFS accesses. + // We should evaluate later if we either want to: + // * leave it in place for backward compatibility (this would be very risky + // given we would be exposing an unlocked version of the MFS root) + // * remove it completely (breaking interface) in favor of LockedFilesRoot; + // this would seem the cleaner option + // * some in-between like renaming LockedFilesRoot to the original FilesRoot + // (which would still be breaking the API and could even be more confusing) + // FilesRoot *mfs.Root + LockedFilesRoot *node.LockedFilesRoot RecordValidator record.Validator // Online diff --git a/core/corerepo/gc.go b/core/corerepo/gc.go index 5db45bf477d..16a7b99c023 100644 --- a/core/corerepo/gc.go +++ b/core/corerepo/gc.go @@ -84,7 +84,9 @@ func BestEffortRoots(filesRoot *mfs.Root) ([]cid.Cid, error) { } func GarbageCollect(n *core.IpfsNode, ctx context.Context) error { - roots, err := BestEffortRoots(n.FilesRoot) + filesRoot := n.LockedFilesRoot.RLock() + defer n.LockedFilesRoot.RUnlock() + roots, err := BestEffortRoots(filesRoot) if err != nil { return err } @@ -148,7 +150,9 @@ func (e *MultiError) Error() string { } func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) <-chan gc.Result { - roots, err := BestEffortRoots(n.FilesRoot) + filesRoot := n.LockedFilesRoot.RLock() + defer n.LockedFilesRoot.RUnlock() + roots, err := BestEffortRoots(filesRoot) if err != nil { out := make(chan gc.Result) out <- gc.Result{Error: err} diff --git a/core/node/core.go b/core/node/core.go index c8305bb610a..d09b805865f 100644 --- a/core/node/core.go +++ b/core/node/core.go @@ -3,6 +3,7 @@ package node import ( "context" "fmt" + "sync" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" @@ -168,3 +169,54 @@ func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format. return root, err } + +// FIXME(BLOCKING): This should probably be merged into the Files provider above +// as no one should have access to the unlocked version of the mfs.Root. +func LockedFiles(mctx helpers.MetricsCtx, lc fx.Lifecycle, root *mfs.Root) (*LockedFilesRoot, error) { + return &LockedFilesRoot{ + root: root, + lock: &sync.Mutex{}, + }, nil +} + +// We distinguish between locking for reading and writing independently of +// how is the lock actually implemented internally (we could be write-locking +// every time, using a RWMutex, or not even locking for reading). +// The objective is: +// > For reads we might want to be more careful than a global lock though. +// > For example, not locking ipfs files ls or ipfs files stat while GC is ongoing, +// > but preventing corruption between MFS write and read operations (out of date data +// > is fine with concurrent operations, but corruption would be bad). +// FIXME(BLOCKING): Decide how are we actually going to implement the locking +// distinction between reading and writing. +// FIXME(BLOCKING): Consider providing our own API functions like GetNode (and similar) that +// already take charge of lock-handling here instead of leaving that burden to the consumer. +// That way we could also make sure the lock is being enforced correctly. +type LockedFilesRoot struct { + root *mfs.Root + lock *sync.Mutex +} + +// FIXME(BLOCKING): Review and cofirm. +// Not enforcing the lock for reading operations to see if something comes up +// in the tests. This is dangerous because MFS is not designed to clearly +// distinguish between read-only and mutating operations. For example, the FSNode +// interface, which is the result used from the most frequent "read" operation of +// doing a Lookup, (a) allows the Flush operation and (b) its File/Directory structures +// implementing it have their own locking mechanism to keep track of open file +// descriptors. +func (lfr *LockedFilesRoot) RLock() *mfs.Root { + return lfr.root +} + +func (lfr *LockedFilesRoot) RUnlock() { +} + +func (lfr *LockedFilesRoot) Lock() *mfs.Root { + lfr.lock.Lock() + return lfr.root +} + +func (lfr *LockedFilesRoot) Unlock() { + lfr.lock.Unlock() +} diff --git a/core/node/groups.go b/core/node/groups.go index 15b45fc8028..4235a9f7547 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -316,6 +316,7 @@ var Core = fx.Options( fx.Provide(FetcherConfig), fx.Provide(Pinning), fx.Provide(Files), + fx.Provide(LockedFiles), ) func Networked(bcfg *BuildCfg, cfg *config.Config) fx.Option { diff --git a/test-mfs-lock.bash b/test-mfs-lock.bash new file mode 100755 index 00000000000..18573b19981 --- /dev/null +++ b/test-mfs-lock.bash @@ -0,0 +1,13 @@ +set -x +set -v + +# The daemon needs to be running, otherwise the commands will compete on their +# lock of the entire repo (not the MFS root) and fail. +ipfs swarm addrs > /dev/null || (echo "daemon not running" && exit 1) + +ipfs --version +ipfs files mkdir /test-lock/ +ipfs files rm /test-lock/ -r +((echo "content" | ./cmd/ipfs/ipfs files write --create --parents --truncate --lock-time 3 /test-lock/file) && echo "ipfs write lock released" &) +ipfs repo gc +# FIXME: This is a flaky test just to manually check the lock in ipfs write is blocking the GC.