diff --git a/cmd/ipfs/pinmfs.go b/cmd/ipfs/pinmfs.go index 8ea9d2dcc8e..93b5afd90da 100644 --- a/cmd/ipfs/pinmfs.go +++ b/cmd/ipfs/pinmfs.go @@ -50,7 +50,11 @@ type ipfsPinMFSNode struct { } func (x *ipfsPinMFSNode) RootNode() (ipld.Node, error) { - return x.node.FilesRoot.GetDirectory().GetNode() + // FIXME(BLOCKING): We're deploying this lock taking mechanism as simply as + // possible for now deferring the unlock but we should be more precise. + filesRoot := x.node.LockedFilesRoot.RLock() + defer x.node.LockedFilesRoot.RUnlock() + return filesRoot.GetDirectory().GetNode() } func (x *ipfsPinMFSNode) Identity() peer.ID { diff --git a/core/commands/files.go b/core/commands/files.go index 3f7e3e6b9a9..5c8ead5a7c9 100644 --- a/core/commands/files.go +++ b/core/commands/files.go @@ -9,10 +9,12 @@ import ( gopath "path" "sort" "strings" + "time" humanize "github.com/dustin/go-humanize" "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/go-ipfs/core/node" bservice "github.com/ipfs/go-blockservice" cid "github.com/ipfs/go-cid" @@ -398,20 +400,23 @@ GC'ed. return fmt.Errorf("cp: cannot get node from path %s: %s", src, err) } + filesRoot := nd.LockedFilesRoot.Lock() + defer nd.LockedFilesRoot.Unlock() + if mkParents { - err := ensureContainingDirectoryExists(nd.FilesRoot, dst, prefix) + err := ensureContainingDirectoryExists(filesRoot, dst, prefix) if err != nil { return err } } - err = mfs.PutNode(nd.FilesRoot, dst, node) + err = mfs.PutNode(filesRoot, dst, node) if err != nil { return fmt.Errorf("cp: cannot put node in path %s: %s", dst, err) } if flush { - _, err := mfs.FlushPath(req.Context, nd.FilesRoot, dst) + _, err := mfs.FlushPath(req.Context, filesRoot, dst) if err != nil { return fmt.Errorf("cp: cannot flush the created file %s: %s", dst, err) } @@ -426,7 +431,9 @@ func getNodeFromPath(ctx context.Context, node *core.IpfsNode, api iface.CoreAPI case strings.HasPrefix(p, "/ipfs/"): return api.ResolveNode(ctx, path.New(p)) default: - fsn, err := mfs.Lookup(node.FilesRoot, p) + filesRoot := node.LockedFilesRoot.RLock() + defer node.LockedFilesRoot.RUnlock() + fsn, err := mfs.Lookup(filesRoot, p) if err != nil { return nil, err } @@ -491,7 +498,10 @@ Examples: return err } - fsn, err := mfs.Lookup(nd.FilesRoot, path) + filesRoot := nd.LockedFilesRoot.RLock() + defer nd.LockedFilesRoot.RUnlock() + + fsn, err := mfs.Lookup(filesRoot, path) if err != nil { return err } @@ -611,7 +621,10 @@ Examples: return err } - fsn, err := mfs.Lookup(nd.FilesRoot, path) + filesRoot := nd.LockedFilesRoot.RLock() + defer nd.LockedFilesRoot.RUnlock() + + fsn, err := mfs.Lookup(filesRoot, path) if err != nil { return err } @@ -706,9 +719,12 @@ Example: return err } - err = mfs.Mv(nd.FilesRoot, src, dst) + filesRoot := nd.LockedFilesRoot.Lock() + defer nd.LockedFilesRoot.Unlock() + + err = mfs.Mv(filesRoot, src, dst) if err == nil && flush { - _, err = mfs.FlushPath(req.Context, nd.FilesRoot, "/") + _, err = mfs.FlushPath(req.Context, filesRoot, "/") } return err }, @@ -768,6 +784,9 @@ stat' on the file or any of its ancestors. cmds.BoolOption(filesTruncateOptionName, "t", "Truncate the file to size zero before writing."), cmds.Int64Option(filesCountOptionName, "n", "Maximum number of bytes to read."), cmds.BoolOption(filesRawLeavesOptionName, "Use raw blocks for newly created leaf nodes. (experimental)"), + // FIXME(BLOCKING): Remove. + // Fake lock for manual testing. + cmds.IntOption("lock-time", "lt", "[TESTING] timeout to hold the MFS lock while copying").WithDefault(0), cidVersionOption, hashOption, }, @@ -798,14 +817,21 @@ stat' on the file or any of its ancestors. return fmt.Errorf("cannot have negative write offset") } + filesRoot := nd.LockedFilesRoot.Lock() + defer nd.LockedFilesRoot.Unlock() + // FIXME(BLOCKING): Remove. + // Keep the hold for a fake, arbitrary amount of time to test it manually. + timeout, _ := req.Options["lock-time"].(int) + defer time.Sleep(time.Duration(timeout) * time.Second) + if mkParents { - err := ensureContainingDirectoryExists(nd.FilesRoot, path, prefix) + err := ensureContainingDirectoryExists(filesRoot, path, prefix) if err != nil { return err } } - fi, err := getFileHandle(nd.FilesRoot, path, create, prefix) + fi, err := getFileHandle(filesRoot, path, create, prefix) if err != nil { return err } @@ -904,7 +930,9 @@ Examples: if err != nil { return err } - root := n.FilesRoot + + root := n.LockedFilesRoot.Lock() + defer n.LockedFilesRoot.Unlock() err = mfs.Mkdir(root, dirtomake, mfs.MkdirOpts{ Mkparents: dashp, @@ -947,7 +975,10 @@ are run with the '--flush=false'. path = req.Arguments[0] } - n, err := mfs.FlushPath(req.Context, nd.FilesRoot, path) + filesRoot := nd.LockedFilesRoot.Lock() + defer nd.LockedFilesRoot.Unlock() + + n, err := mfs.FlushPath(req.Context, filesRoot, path) if err != nil { return err } @@ -989,9 +1020,12 @@ Change the CID version or hash function of the root node of a given path. return err } - err = updatePath(nd.FilesRoot, path, prefix) + filesRoot := nd.LockedFilesRoot.Lock() + defer nd.LockedFilesRoot.Unlock() + + err = updatePath(filesRoot, path, prefix) if err == nil && flush { - _, err = mfs.FlushPath(req.Context, nd.FilesRoot, path) + _, err = mfs.FlushPath(req.Context, filesRoot, path) } return err }, @@ -1052,74 +1086,13 @@ Remove files or directories. for _, arg := range req.Arguments { path, err := checkPath(arg) if err != nil { - errs = append(errs, fmt.Errorf("%s: %w", arg, err)) - continue - } - - if path == "/" { - errs = append(errs, fmt.Errorf("%s: cannot delete root", path)) - continue - } - - // 'rm a/b/c/' will fail unless we trim the slash at the end - if path[len(path)-1] == '/' { - path = path[:len(path)-1] - } - - dir, name := gopath.Split(path) - - pdir, err := getParentDir(nd.FilesRoot, dir) - if err != nil { - if force && err == os.ErrNotExist { - continue - } - errs = append(errs, fmt.Errorf("%s: parent lookup: %w", path, err)) - continue - } - - if force { - err := pdir.Unlink(name) - if err != nil { - if err == os.ErrNotExist { - continue - } - errs = append(errs, fmt.Errorf("%s: %w", path, err)) - continue - } - err = pdir.Flush() - if err != nil { - errs = append(errs, fmt.Errorf("%s: %w", path, err)) - } - continue - } - - // get child node by name, when the node is corrupted and nonexistent, - // it will return specific error. - child, err := pdir.Child(name) - if err != nil { - errs = append(errs, fmt.Errorf("%s: %w", path, err)) - continue - } - - switch child.(type) { - case *mfs.Directory: - if !dashr { - errs = append(errs, fmt.Errorf("%s is a directory, use -r to remove directories", path)) - continue - } - } - - err = pdir.Unlink(name) - if err != nil { - errs = append(errs, fmt.Errorf("%s: %w", path, err)) + errs = append(errs, fmt.Errorf("%s is not a valid path: %w", arg, err)) continue } - err = pdir.Flush() - if err != nil { + if err := removePath(nd.LockedFilesRoot, path, force, dashr); err != nil { errs = append(errs, fmt.Errorf("%s: %w", path, err)) } - continue } if len(errs) > 0 { for _, err = range errs { @@ -1134,6 +1107,62 @@ Remove files or directories. }, } +func removePath(lockedFilesRoot *node.LockedFilesRoot, path string, force bool, dashr bool) error { + if path == "/" { + return fmt.Errorf("cannot delete root") + } + + // 'rm a/b/c/' will fail unless we trim the slash at the end + if path[len(path)-1] == '/' { + path = path[:len(path)-1] + } + + dir, name := gopath.Split(path) + + filesRoot := lockedFilesRoot.Lock() + defer lockedFilesRoot.Unlock() + + pdir, err := getParentDir(filesRoot, dir) + if err != nil { + if force && err == os.ErrNotExist { + return nil + } + return err + } + + if force { + err := pdir.Unlink(name) + if err != nil { + if err == os.ErrNotExist { + return nil + } + return err + } + return pdir.Flush() + } + + // get child node by name, when the node is corrupted and nonexistent, + // it will return specific error. + child, err := pdir.Child(name) + if err != nil { + return err + } + + switch child.(type) { + case *mfs.Directory: + if !dashr { + return fmt.Errorf("path is a directory, use -r to remove directories") + } + } + + err = pdir.Unlink(name) + if err != nil { + return err + } + + return pdir.Flush() +} + func getPrefixNew(req *cmds.Request) (cid.Builder, error) { cidVer, cidVerSet := req.Options[filesCidVersionOptionName].(int) hashFunStr, hashFunSet := req.Options[filesHashOptionName].(string) diff --git a/core/core.go b/core/core.go index 888d3d78013..32d75fa64fb 100644 --- a/core/core.go +++ b/core/core.go @@ -24,7 +24,6 @@ import ( "github.com/ipfs/go-ipfs-provider" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" - mfs "github.com/ipfs/go-mfs" goprocess "github.com/jbenet/goprocess" connmgr "github.com/libp2p/go-libp2p-core/connmgr" ic "github.com/libp2p/go-libp2p-core/crypto" @@ -81,7 +80,17 @@ type IpfsNode struct { UnixFSFetcherFactory fetcher.Factory `name:"unixfsFetcher"` // fetcher that interprets UnixFS data Reporter *metrics.BandwidthCounter `optional:"true"` Discovery mdns.Service `optional:"true"` - FilesRoot *mfs.Root + // FIXME(BLOCKING): Decided on this. + // Temporarily masking FilesRoot to make sure we're covering *all* MFS accesses. + // We should evaluate later if we either want to: + // * leave it in place for backward compatibility (this would be very risky + // given we would be exposing an unlocked version of the MFS root) + // * remove it completely (breaking interface) in favor of LockedFilesRoot; + // this would seem the cleaner option + // * some in-between like renaming LockedFilesRoot to the original FilesRoot + // (which would still be breaking the API and could even be more confusing) + // FilesRoot *mfs.Root + LockedFilesRoot *node.LockedFilesRoot RecordValidator record.Validator // Online diff --git a/core/corerepo/gc.go b/core/corerepo/gc.go index 5db45bf477d..16a7b99c023 100644 --- a/core/corerepo/gc.go +++ b/core/corerepo/gc.go @@ -84,7 +84,9 @@ func BestEffortRoots(filesRoot *mfs.Root) ([]cid.Cid, error) { } func GarbageCollect(n *core.IpfsNode, ctx context.Context) error { - roots, err := BestEffortRoots(n.FilesRoot) + filesRoot := n.LockedFilesRoot.RLock() + defer n.LockedFilesRoot.RUnlock() + roots, err := BestEffortRoots(filesRoot) if err != nil { return err } @@ -148,7 +150,9 @@ func (e *MultiError) Error() string { } func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) <-chan gc.Result { - roots, err := BestEffortRoots(n.FilesRoot) + filesRoot := n.LockedFilesRoot.RLock() + defer n.LockedFilesRoot.RUnlock() + roots, err := BestEffortRoots(filesRoot) if err != nil { out := make(chan gc.Result) out <- gc.Result{Error: err} diff --git a/core/node/core.go b/core/node/core.go index c8305bb610a..d09b805865f 100644 --- a/core/node/core.go +++ b/core/node/core.go @@ -3,6 +3,7 @@ package node import ( "context" "fmt" + "sync" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" @@ -168,3 +169,54 @@ func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format. return root, err } + +// FIXME(BLOCKING): This should probably be merged into the Files provider above +// as no one should have access to the unlocked version of the mfs.Root. +func LockedFiles(mctx helpers.MetricsCtx, lc fx.Lifecycle, root *mfs.Root) (*LockedFilesRoot, error) { + return &LockedFilesRoot{ + root: root, + lock: &sync.Mutex{}, + }, nil +} + +// We distinguish between locking for reading and writing independently of +// how is the lock actually implemented internally (we could be write-locking +// every time, using a RWMutex, or not even locking for reading). +// The objective is: +// > For reads we might want to be more careful than a global lock though. +// > For example, not locking ipfs files ls or ipfs files stat while GC is ongoing, +// > but preventing corruption between MFS write and read operations (out of date data +// > is fine with concurrent operations, but corruption would be bad). +// FIXME(BLOCKING): Decide how are we actually going to implement the locking +// distinction between reading and writing. +// FIXME(BLOCKING): Consider providing our own API functions like GetNode (and similar) that +// already take charge of lock-handling here instead of leaving that burden to the consumer. +// That way we could also make sure the lock is being enforced correctly. +type LockedFilesRoot struct { + root *mfs.Root + lock *sync.Mutex +} + +// FIXME(BLOCKING): Review and cofirm. +// Not enforcing the lock for reading operations to see if something comes up +// in the tests. This is dangerous because MFS is not designed to clearly +// distinguish between read-only and mutating operations. For example, the FSNode +// interface, which is the result used from the most frequent "read" operation of +// doing a Lookup, (a) allows the Flush operation and (b) its File/Directory structures +// implementing it have their own locking mechanism to keep track of open file +// descriptors. +func (lfr *LockedFilesRoot) RLock() *mfs.Root { + return lfr.root +} + +func (lfr *LockedFilesRoot) RUnlock() { +} + +func (lfr *LockedFilesRoot) Lock() *mfs.Root { + lfr.lock.Lock() + return lfr.root +} + +func (lfr *LockedFilesRoot) Unlock() { + lfr.lock.Unlock() +} diff --git a/core/node/groups.go b/core/node/groups.go index 15b45fc8028..4235a9f7547 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -316,6 +316,7 @@ var Core = fx.Options( fx.Provide(FetcherConfig), fx.Provide(Pinning), fx.Provide(Files), + fx.Provide(LockedFiles), ) func Networked(bcfg *BuildCfg, cfg *config.Config) fx.Option { diff --git a/test-mfs-lock.bash b/test-mfs-lock.bash new file mode 100755 index 00000000000..18573b19981 --- /dev/null +++ b/test-mfs-lock.bash @@ -0,0 +1,13 @@ +set -x +set -v + +# The daemon needs to be running, otherwise the commands will compete on their +# lock of the entire repo (not the MFS root) and fail. +ipfs swarm addrs > /dev/null || (echo "daemon not running" && exit 1) + +ipfs --version +ipfs files mkdir /test-lock/ +ipfs files rm /test-lock/ -r +((echo "content" | ./cmd/ipfs/ipfs files write --create --parents --truncate --lock-time 3 /test-lock/file) && echo "ipfs write lock released" &) +ipfs repo gc +# FIXME: This is a flaky test just to manually check the lock in ipfs write is blocking the GC.