From 719e536c55dc814b9dba151f34ce1100dfa9f3b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Wed, 8 Mar 2023 11:14:31 +0100 Subject: [PATCH] Use storage-relative paths as the cache key That prevents non-aligned cache keys when multiple providers mount the storage at a different location. --- changelog/unreleased/fix-ini-backend.md | 5 + pkg/storage/cache/cache.go | 1 + pkg/storage/cache/filemetadata.go | 16 +++ .../utils/decomposedfs/decomposedfs.go | 2 +- .../decomposedfs_concurrency_test.go | 20 +++ .../utils/decomposedfs/lookup/lookup.go | 57 +++----- .../decomposedfs/metadata/ini_backend.go | 131 ++++++++++-------- .../decomposedfs/metadata/metadata_test.go | 27 ++-- pkg/storage/utils/decomposedfs/node/node.go | 4 +- .../utils/decomposedfs/node/node_test.go | 2 +- pkg/storage/utils/decomposedfs/spaces.go | 7 +- .../utils/decomposedfs/testhelpers/helpers.go | 2 +- pkg/storage/utils/decomposedfs/tree/tree.go | 69 +++++---- .../utils/decomposedfs/upload/processing.go | 59 ++++---- 14 files changed, 242 insertions(+), 160 deletions(-) create mode 100644 changelog/unreleased/fix-ini-backend.md diff --git a/changelog/unreleased/fix-ini-backend.md b/changelog/unreleased/fix-ini-backend.md new file mode 100644 index 0000000000..219a2361c7 --- /dev/null +++ b/changelog/unreleased/fix-ini-backend.md @@ -0,0 +1,5 @@ +Bugfix: Fix ini backend + +We fixed a few issues in the ini backend regarding encoding and cache invalidation. + +https://github.com/cs3org/reva/pull/3711 diff --git a/pkg/storage/cache/cache.go b/pkg/storage/cache/cache.go index ae96770138..0f1f507897 100644 --- a/pkg/storage/cache/cache.go +++ b/pkg/storage/cache/cache.go @@ -85,6 +85,7 @@ type CreatePersonalSpaceCache interface { // FileMetadataCache handles file metadata type FileMetadataCache interface { Cache + RemoveMetadata(path string) error } // GetStatCache will return an existing StatCache for the given store, nodes, database and table diff --git a/pkg/storage/cache/filemetadata.go b/pkg/storage/cache/filemetadata.go index 0966b40e47..1dc6866a7a 100644 --- a/pkg/storage/cache/filemetadata.go +++ b/pkg/storage/cache/filemetadata.go @@ -19,6 +19,7 @@ package cache import ( + "strings" "time" ) @@ -36,3 +37,18 @@ func NewFileMetadataCache(store string, nodes []string, database, table string, return c } + +// RemoveMetadata removes a reference from the metadata cache +func (c *fileMetadataCache) RemoveMetadata(path string) error { + keys, err := c.List() + if err != nil { + return err + } + + for _, key := range keys { + if strings.HasPrefix(key, path) { + _ = c.Delete(key) + } + } + return nil +} diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 83d3beeab5..0f6a988f4a 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -108,7 +108,7 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) ( case "xattrs": lu = lookup.New(metadata.XattrsBackend{}, o) case "ini": - lu = lookup.New(metadata.NewIniBackend(o.FileMetadataCache), o) + lu = lookup.New(metadata.NewIniBackend(o.Root, o.FileMetadataCache), o) default: return nil, fmt.Errorf("unknown metadata backend %s, only 'ini' or 'xattrs' (default) supported", o.MetadataBackend) } diff --git a/pkg/storage/utils/decomposedfs/decomposedfs_concurrency_test.go b/pkg/storage/utils/decomposedfs/decomposedfs_concurrency_test.go index 6a6d8643e0..55999537c3 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs_concurrency_test.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs_concurrency_test.go @@ -112,6 +112,26 @@ var _ = Describe("Decomposed", func() { woFile.Close() Eventually(func() bool { s, _ := states.Load("managedToOpenroFile"); return s.(bool) }).Should(BeTrue()) }) + + It("allows opening rw while an exclusive lock is being held", func() { + states := sync.Map{} + + path, err := os.CreateTemp("", "decomposedfs-lockedfile-test-") + Expect(err).ToNot(HaveOccurred()) + states.Store("managedToOpenrwLockedFile", false) + woFile, err := lockedfile.OpenFile(path.Name(), os.O_WRONLY, 0) + Expect(err).ToNot(HaveOccurred()) + defer woFile.Close() + go func() { + roFile, err := os.OpenFile(path.Name(), os.O_RDWR, 0) + Expect(err).ToNot(HaveOccurred()) + defer roFile.Close() + states.Store("managedToOpenrwLockedFile", true) + }() + + woFile.Close() + Eventually(func() bool { s, _ := states.Load("managedToOpenrwLockedFile"); return s.(bool) }).Should(BeTrue()) + }) }) }) diff --git a/pkg/storage/utils/decomposedfs/lookup/lookup.go b/pkg/storage/utils/decomposedfs/lookup/lookup.go index e546c0cfe6..4769dcc0b4 100644 --- a/pkg/storage/utils/decomposedfs/lookup/lookup.go +++ b/pkg/storage/utils/decomposedfs/lookup/lookup.go @@ -33,9 +33,8 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" - "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" - "github.com/gofrs/flock" "github.com/pkg/errors" + "github.com/rogpeppe/go-internal/lockedfile" ) // Lookup implements transformations from filepath to node and back @@ -277,18 +276,20 @@ func refFromCS3(b []byte) (*provider.Reference, error) { // CopyMetadata copies all extended attributes from source to target. // The optional filter function can be used to filter by attribute name, e.g. by checking a prefix // For the source file, a shared lock is acquired. -// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally +// NOTE: target resource will be write locked! func (lu *Lookup) CopyMetadata(src, target string, filter func(attributeName string) bool) (err error) { - var readLock *flock.Flock - // Acquire a read log on the source node - readLock, err = filelocks.AcquireReadLock(src) + // write lock existing node before reading treesize or tree time + f, err := lockedfile.Open(lu.MetadataBackend().MetadataPath(src)) + if err != nil { + return err + } if err != nil { return errors.Wrap(err, "xattrs: Unable to lock source to read") } defer func() { - rerr := filelocks.ReleaseLock(readLock) + rerr := f.Close() // if err is non nil we do not overwrite that if err == nil { @@ -296,50 +297,32 @@ func (lu *Lookup) CopyMetadata(src, target string, filter func(attributeName str } }() - return lu.CopyMetadataWithSourceLock(src, target, filter, readLock) + return lu.CopyMetadataWithSourceLock(src, target, filter, f) } // CopyMetadataWithSourceLock copies all extended attributes from source to target. // The optional filter function can be used to filter by attribute name, e.g. by checking a prefix -// For the source file, a shared lock is acquired. -// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally -func (lu *Lookup) CopyMetadataWithSourceLock(src, target string, filter func(attributeName string) bool, readLock *flock.Flock) (err error) { +// For the source file, a matching lockedfile is required. +// NOTE: target resource will be write locked! +func (lu *Lookup) CopyMetadataWithSourceLock(source, target string, filter func(attributeName string) bool, readLock *lockedfile.File) (err error) { switch { case readLock == nil: return errors.New("no lock provided") - case readLock.Path() != filelocks.FlockFile(src): + case readLock.File.Name() != lu.MetadataBackend().MetadataPath(source): return errors.New("lockpath does not match filepath") - case !readLock.Locked() && !readLock.RLocked(): // we need either a read or a write lock - return errors.New("not locked") } - // both locks are established. Copy. - var attrNameList []string - if attrNameList, err = lu.metadataBackend.List(src); err != nil { - return errors.Wrap(err, "Can not get xattr listing on src") + attrs, err := lu.metadataBackend.All(source) + if err != nil { + return err } - // error handling: We count errors of reads or writes of xattrs. - // if there were any read or write errors an error is returned. - var ( - xerrs = 0 - xerr error - ) - for idx := range attrNameList { - attrName := attrNameList[idx] + newAttrs := make(map[string]string, 0) + for attrName, val := range attrs { if filter == nil || filter(attrName) { - var attrVal string - if attrVal, xerr = lu.metadataBackend.Get(src, attrName); xerr != nil { - xerrs++ - } - if xerr = lu.metadataBackend.Set(target, attrName, attrVal); xerr != nil { - xerrs++ - } + newAttrs[attrName] = val } } - if xerrs > 0 { - err = errors.Wrap(xerr, "failed to copy all xattrs, last error returned") - } - return err + return lu.MetadataBackend().SetMultiple(target, newAttrs, true) } diff --git a/pkg/storage/utils/decomposedfs/metadata/ini_backend.go b/pkg/storage/utils/decomposedfs/metadata/ini_backend.go index 6e96a84a75..bc68c0470f 100644 --- a/pkg/storage/utils/decomposedfs/metadata/ini_backend.go +++ b/pkg/storage/utils/decomposedfs/metadata/ini_backend.go @@ -22,20 +22,26 @@ import ( "encoding/base64" "io" "os" + "path/filepath" "strconv" "strings" "time" + "unicode" "github.com/cs3org/reva/v2/pkg/storage/cache" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" "github.com/pkg/xattr" "github.com/rogpeppe/go-internal/lockedfile" "gopkg.in/ini.v1" ) +func init() { + ini.PrettyFormat = false // Disable alignment of values for performance reasons +} + // IniBackend persists the attributes in INI format inside the file type IniBackend struct { + rootPath string metaCache cache.FileMetadataCache } @@ -45,11 +51,11 @@ type readWriteCloseSeekTruncater interface { Truncate(int64) error } -var encodedPrefixes = []string{prefixes.ChecksumPrefix, prefixes.MetadataPrefix, prefixes.GrantPrefix} - // NewIniBackend returns a new IniBackend instance -func NewIniBackend(o options.CacheOptions) IniBackend { +func NewIniBackend(rootPath string, o options.CacheOptions) IniBackend { + ini.PrettyFormat = false return IniBackend{ + rootPath: filepath.Clean(rootPath), metaCache: cache.GetFileMetadataCache(o.CacheStore, o.CacheNodes, o.CacheDatabase, "filemetadata", 24*time.Hour), } } @@ -142,6 +148,9 @@ func (b IniBackend) saveIni(path string, setAttribs map[string]string, deleteAtt } defer f.Close() + // Invalidate cache early + _ = b.metaCache.RemoveMetadata(path) + // Read current state iniBytes, err := io.ReadAll(f) if err != nil { @@ -153,7 +162,7 @@ func (b IniBackend) saveIni(path string, setAttribs map[string]string, deleteAtt } // Prepare new metadata - iniAttribs, err := decodedAttribs(iniFile) + iniAttribs, err := decodeAttribs(iniFile) if err != nil { return err } @@ -179,13 +188,7 @@ func (b IniBackend) saveIni(path string, setAttribs map[string]string, deleteAtt if err != nil { return err } - for key, val := range iniAttribs { - for _, prefix := range encodedPrefixes { - if strings.HasPrefix(key, prefix) { - val = base64.StdEncoding.EncodeToString([]byte(val)) - break - } - } + for key, val := range encodeAttribs(iniAttribs) { ini.Section("").Key(key).SetValue(val) } _, err = ini.WriteTo(f) @@ -193,29 +196,20 @@ func (b IniBackend) saveIni(path string, setAttribs map[string]string, deleteAtt return err } - return b.metaCache.PushToCache(path, iniAttribs) + return b.metaCache.PushToCache(b.cacheKey(path), iniAttribs) } func (b IniBackend) loadMeta(path string) (map[string]string, error) { var attribs map[string]string - err := b.metaCache.PullFromCache(path, &attribs) + err := b.metaCache.PullFromCache(b.cacheKey(path), &attribs) if err == nil { return attribs, err } - var iniFile *ini.File - f, err := os.ReadFile(path) - length := len(f) + lockedFile, err := lockedfile.Open(path) - // Try to read the file without getting a lock first. We will either - // get the old or the new state or an empty byte array when the file - // was just truncated by a writer. - if err == nil && length > 0 { - iniFile, err = ini.Load(f) - if err != nil { - return nil, err - } - } else { + // // No cached entry found. Read from storage and store in cache + if err != nil { if os.IsNotExist(err) { // some of the caller rely on ENOTEXISTS to be returned when the // actual file (not the metafile) does not exist in order to @@ -226,26 +220,20 @@ func (b IniBackend) loadMeta(path string) (map[string]string, error) { } return attribs, nil // no attributes set yet } + } + defer lockedFile.Close() - // // No cached entry found. Read from storage and store in cache - lockedFile, err := lockedfile.Open(path) - if err != nil { - return nil, err - } - defer lockedFile.Close() - - iniFile, err = ini.Load(lockedFile) - if err != nil { - return nil, err - } + iniFile, err := ini.Load(lockedFile) + if err != nil { + return nil, err } - attribs, err = decodedAttribs(iniFile) + attribs, err = decodeAttribs(iniFile) if err != nil { return nil, err } - err = b.metaCache.PushToCache(path, attribs) + err = b.metaCache.PushToCache(b.cacheKey(path), attribs) if err != nil { return nil, err } @@ -258,7 +246,7 @@ func (IniBackend) IsMetaFile(path string) bool { return strings.HasSuffix(path, // Purge purges the data of a given path func (b IniBackend) Purge(path string) error { - if err := b.metaCache.Delete(path); err != nil { + if err := b.metaCache.RemoveMetadata(b.cacheKey(path)); err != nil { return err } return os.Remove(b.MetadataPath(path)) @@ -267,12 +255,12 @@ func (b IniBackend) Purge(path string) error { // Rename moves the data for a given path to a new path func (b IniBackend) Rename(oldPath, newPath string) error { data := map[string]string{} - _ = b.metaCache.PullFromCache(oldPath, &data) - err := b.metaCache.Delete(oldPath) + _ = b.metaCache.PullFromCache(b.cacheKey(oldPath), &data) + err := b.metaCache.RemoveMetadata(b.cacheKey(oldPath)) if err != nil { return err } - err = b.metaCache.PushToCache(newPath, data) + err = b.metaCache.PushToCache(b.cacheKey(newPath), data) if err != nil { return err } @@ -283,19 +271,54 @@ func (b IniBackend) Rename(oldPath, newPath string) error { // MetadataPath returns the path of the file holding the metadata for the given path func (IniBackend) MetadataPath(path string) string { return path + ".ini" } -func decodedAttribs(iniFile *ini.File) (map[string]string, error) { - attribs := iniFile.Section("").KeysHash() +func (b IniBackend) cacheKey(path string) string { + // rootPath is guaranteed to have no trailing slash + // the cache key shouldn't begin with a slash as some stores drop it which can cause + // confusion + return strings.TrimPrefix(path, b.rootPath+"/") +} + +func needsEncoding(s []byte) bool { + if len(s) == 0 { + return false + } + + if s[0] == '\'' || s[0] == '"' || s[0] == '`' { + return true + } + + for i := 0; i < len(s); i++ { + if s[i] < 32 || s[i] >= unicode.MaxASCII { // ASCII 127 = Del - we don't want that + return true + } + } + return false +} + +func encodeAttribs(attribs map[string]string) map[string]string { + encAttribs := map[string]string{} for key, val := range attribs { - for _, prefix := range encodedPrefixes { - if strings.HasPrefix(key, prefix) { - valBytes, err := base64.StdEncoding.DecodeString(val) - if err != nil { - return nil, err - } - attribs[key] = string(valBytes) - break + if needsEncoding([]byte(val)) { + encAttribs["base64:"+key] = base64.StdEncoding.EncodeToString([]byte(val)) + } else { + encAttribs[key] = val + } + } + return encAttribs +} + +func decodeAttribs(iniFile *ini.File) (map[string]string, error) { + decodedAttributes := map[string]string{} + for key, val := range iniFile.Section("").KeysHash() { + if strings.HasPrefix(key, "base64:") { + key = strings.TrimPrefix(key, "base64:") + valBytes, err := base64.StdEncoding.DecodeString(val) + if err != nil { + return nil, err } + val = string(valBytes) } + decodedAttributes[key] = val } - return attribs, nil + return decodedAttributes, nil } diff --git a/pkg/storage/utils/decomposedfs/metadata/metadata_test.go b/pkg/storage/utils/decomposedfs/metadata/metadata_test.go index a3459b14f9..d1fc4a04fd 100644 --- a/pkg/storage/utils/decomposedfs/metadata/metadata_test.go +++ b/pkg/storage/utils/decomposedfs/metadata/metadata_test.go @@ -59,7 +59,7 @@ var _ = Describe("Backend", func() { Describe("IniBackend", func() { BeforeEach(func() { - backend = metadata.NewIniBackend(options.CacheOptions{}) + backend = metadata.NewIniBackend(tmpdir, options.CacheOptions{}) }) Describe("Set", func() { @@ -89,23 +89,30 @@ var _ = Describe("Backend", func() { content, err := os.ReadFile(metafile) Expect(err).ToNot(HaveOccurred()) - Expect(string(content)).To(Equal("user.ocis.cs.foo = YmFy\n")) + Expect(string(content)).To(Equal("user.ocis.cs.foo = bar\n")) + + err = backend.Set(file, "user.ocis.cs.foo", string([]byte{200, 201, 202})) + Expect(err).ToNot(HaveOccurred()) + + content, err = os.ReadFile(metafile) + Expect(err).ToNot(HaveOccurred()) + Expect(string(content)).To(Equal("`base64:user.ocis.cs.foo` = yMnK\n")) }) It("doesn't encode already encoded attributes", func() { - err := backend.Set(file, "user.ocis.cs.foo", "bar") + err := backend.Set(file, "user.ocis.cs.foo", string([]byte{200, 201, 202})) Expect(err).ToNot(HaveOccurred()) content, err := os.ReadFile(metafile) Expect(err).ToNot(HaveOccurred()) - Expect(string(content)).To(Equal("user.ocis.cs.foo = YmFy\n")) + Expect(string(content)).To(Equal("`base64:user.ocis.cs.foo` = yMnK\n")) err = backend.Set(file, "user.something", "doesn'tmatter") Expect(err).ToNot(HaveOccurred()) content, err = os.ReadFile(metafile) Expect(err).ToNot(HaveOccurred()) - Expect(string(content)).To(ContainSubstring("user.ocis.cs.foo = YmFy\n")) + Expect(string(content)).To(ContainSubstring("`base64:user.ocis.cs.foo` = yMnK\n")) }) It("sets an empty attribute", func() { @@ -147,8 +154,8 @@ var _ = Describe("Backend", func() { It("encodes where needed", func() { err := backend.SetMultiple(file, map[string]string{ "user.ocis.something.foo": "bar", - "user.ocis.cs.foo": "bar", - "user.ocis.md.foo": "bar", + "user.ocis.cs.foo": string([]byte{200, 201, 202}), + "user.ocis.md.foo": string([]byte{200, 201, 202}), "user.ocis.grant.foo": "bar", }, true) Expect(err).ToNot(HaveOccurred()) @@ -157,9 +164,9 @@ var _ = Describe("Backend", func() { Expect(err).ToNot(HaveOccurred()) expected := []string{ "user.ocis.something.foo=bar", - "user.ocis.cs.foo=YmFy", - "user.ocis.md.foo=YmFy", - "user.ocis.grant.foo=YmFy"} + "`base64:user.ocis.cs.foo`=yMnK", + "`base64:user.ocis.md.foo`=yMnK", + "user.ocis.grant.foo=bar"} Expect(strings.Split(strings.ReplaceAll(strings.Trim(string(content), "\n"), " ", ""), "\n")).To(ConsistOf(expected)) }) }) diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 23f671eb12..51712a7f0a 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -184,7 +184,7 @@ func (n *Node) ChangeOwner(new *userpb.UserId) (err error) { } // WriteAllNodeMetadata writes the Node metadata to disk -func (n *Node) WriteAllNodeMetadata() (err error) { +func (n *Node) WriteAllNodeMetadata(ctx context.Context) (err error) { attribs := make(map[string]string) attribs[prefixes.TypeAttr] = strconv.FormatInt(int64(n.Type()), 10) @@ -315,6 +315,8 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis n.Name = attrs[prefixes.NameAttr] n.ParentID = attrs[prefixes.ParentidAttr] if n.ParentID == "" { + d, _ := os.ReadFile(n.InternalPath() + ".ini") + appctx.GetLogger(ctx).Error().Str("nodeid", n.ID).Str("parentid", n.ParentID).Interface("attrs", attrs).Str("ini", string(d)).Msg("missing parent id") return nil, errtypes.InternalError("Missing parent ID on node") } // TODO why do we stat the parent? to determine if the current node is in the trash we would need to traverse all parents... diff --git a/pkg/storage/utils/decomposedfs/node/node_test.go b/pkg/storage/utils/decomposedfs/node/node_test.go index bd3771bc2d..6d3c99bbd6 100644 --- a/pkg/storage/utils/decomposedfs/node/node_test.go +++ b/pkg/storage/utils/decomposedfs/node/node_test.go @@ -94,7 +94,7 @@ var _ = Describe("Node", func() { n.BlobID = "TestBlobID" n.Blobsize = blobsize - err = n.WriteAllNodeMetadata() + err = n.WriteAllNodeMetadata(env.Ctx) Expect(err).ToNot(HaveOccurred()) n2, err := env.Lookup.NodeFromResource(env.Ctx, ref) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/storage/utils/decomposedfs/spaces.go b/pkg/storage/utils/decomposedfs/spaces.go index 3a872b2fcd..5135aa71dc 100644 --- a/pkg/storage/utils/decomposedfs/spaces.go +++ b/pkg/storage/utils/decomposedfs/spaces.go @@ -102,7 +102,7 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr return nil, errors.Wrap(err, "Decomposedfs: error creating node") } - if err := root.WriteAllNodeMetadata(); err != nil { + if err := root.WriteAllNodeMetadata(ctx); err != nil { return nil, err } var owner *userv1beta1.UserId @@ -341,10 +341,15 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide for match := range matches { var err error + // TODO introduce metadata.IsLockFile(path) // do not investigate flock files any further. They indicate file locks but are not relevant here. if strings.HasSuffix(match, filelocks.LockFileSuffix) { continue } + // skip metadata files + if fs.lu.MetadataBackend().IsMetaFile(match) { + continue + } // always read link in case storage space id != node id spaceID, nodeID, err = ReadSpaceAndNodeFromIndexLink(match) if err != nil { diff --git a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go index 3e2bb412f8..3e71933031 100644 --- a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go +++ b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go @@ -219,7 +219,7 @@ func (t *TestEnv) CreateTestFile(name, blobID, parentID, spaceID string, blobSiz if err != nil { return nil, err } - err = n.WriteAllNodeMetadata() + err = n.WriteAllNodeMetadata(context.Background()) if err != nil { return nil, err } diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index 48c6a79f78..aeb0f53f68 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -44,6 +44,7 @@ import ( "github.com/cs3org/reva/v2/pkg/utils" "github.com/google/uuid" "github.com/pkg/errors" + "github.com/rogpeppe/go-internal/lockedfile" "github.com/rs/zerolog/log" ) @@ -264,7 +265,7 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node) error { return errors.Wrap(err, "Decomposedfs: error creating node") } - err = n.WriteAllNodeMetadata() + err = n.WriteAllNodeMetadata(ctx) if err != nil { return err } @@ -300,7 +301,7 @@ func (t *Tree) CreateDir(ctx context.Context, n *node.Node) (err error) { n.ID = uuid.New().String() } - err = t.createDirNode(n) + err = t.createDirNode(ctx, n) if err != nil { return } @@ -386,11 +387,11 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) } // update target parentid and name - if err := oldNode.SetXattr(prefixes.ParentidAttr, newNode.ParentID); err != nil { - return errors.Wrap(err, "Decomposedfs: could not set parentid attribute") - } - if err := oldNode.SetXattr(prefixes.NameAttr, newNode.Name); err != nil { - return errors.Wrap(err, "Decomposedfs: could not set name attribute") + if err := oldNode.SetXattrs(map[string]string{ + prefixes.ParentidAttr: newNode.ParentID, + prefixes.NameAttr: newNode.Name, + }, true); err != nil { + return errors.Wrap(err, "Decomposedfs: could not update old node attributes") } // the size diff is the current treesize or blobsize of the old/source node @@ -616,16 +617,18 @@ func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPa } targetNode.Exists = true - // update name attribute - if err := recycleNode.SetXattr(prefixes.NameAttr, targetNode.Name); err != nil { - return errors.Wrap(err, "Decomposedfs: could not set name attribute") - } - // set ParentidAttr to restorePath's node parent id + attrs := map[string]string{ + // update name attribute + prefixes.NameAttr: targetNode.Name, + } if trashPath != "" { - if err := recycleNode.SetXattr(prefixes.ParentidAttr, targetNode.ParentID); err != nil { - return errors.Wrap(err, "Decomposedfs: could not set name attribute") - } + // set ParentidAttr to restorePath's node parent id + attrs[prefixes.ParentidAttr] = targetNode.ParentID + } + + if err = recycleNode.SetXattrs(attrs, true); err != nil { + return errors.Wrap(err, "Decomposedfs: could not update recycle node") } // delete item link in trash @@ -777,19 +780,27 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err if t.treeTimeAccounting || (t.treeSizeAccounting && sizeDiff != 0) { attrs := map[string]string{} + var f *lockedfile.File // lock node before reading treesize or tree time - nodeLock, err := filelocks.AcquireWriteLock(n.InternalPath()) + switch t.lookup.MetadataBackend().(type) { + case metadata.IniBackend: + f, err = lockedfile.OpenFile(t.lookup.MetadataBackend().MetadataPath(n.InternalPath()), os.O_RDWR|os.O_CREATE, 0600) + case metadata.XattrsBackend: + // we have to use dedicated lockfiles to lock directories + // this only works because the xattr backend also locks folders with separate lock files + f, err = lockedfile.OpenFile(n.InternalPath()+filelocks.LockFileSuffix, os.O_RDWR|os.O_CREATE, 0600) + } if err != nil { return err } - // always unlock node - releaseLock := func() { - // ReleaseLock returns nil if already unlocked - if err := filelocks.ReleaseLock(nodeLock); err != nil { - sublog.Err(err).Msg("Decomposedfs: could not unlock parent node") + // always log error if closing node fails + defer func() { + // ignore already closed error + cerr := f.Close() + if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) { + err = cerr // only overwrite err with en error from close if the former was nil } - } - defer releaseLock() + }() if t.treeTimeAccounting { // update the parent tree time if it is older than the nodes mtime @@ -857,10 +868,10 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err return err } - // Release node lock early, returns nil if already unlocked - err = filelocks.ReleaseLock(nodeLock) - if err != nil { - return errtypes.InternalError(err.Error()) + // Release node lock early, ignore already closed error + cerr := f.Close() + if cerr != nil && !errors.Is(cerr, os.ErrClosed) { + return cerr } } } @@ -942,14 +953,14 @@ func (t *Tree) DeleteBlob(node *node.Node) error { } // TODO check if node exists? -func (t *Tree) createDirNode(n *node.Node) (err error) { +func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) { // create a directory node nodePath := n.InternalPath() if err := os.MkdirAll(nodePath, 0700); err != nil { return errors.Wrap(err, "Decomposedfs: error creating node") } - return n.WriteAllNodeMetadata() + return n.WriteAllNodeMetadata(ctx) } var nodeIDRegep = regexp.MustCompile(`.*/nodes/([^.]*).*`) diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index b9195c7817..187757bb4e 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -38,15 +38,15 @@ import ( "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" - "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" - "github.com/gofrs/flock" "github.com/google/uuid" "github.com/pkg/errors" + "github.com/rogpeppe/go-internal/lockedfile" tusd "github.com/tus/tusd/pkg/handler" ) @@ -266,15 +266,18 @@ func CreateNodeForUpload(upload *Upload, initAttrs map[string]string) (*node.Nod return nil, err } - var lock *flock.Flock + var f *lockedfile.File switch n.ID { case "": - lock, err = initNewNode(upload, n, uint64(fsize)) + f, err = initNewNode(upload, n, uint64(fsize)) default: - lock, err = updateExistingNode(upload, n, spaceID, uint64(fsize)) + f, err = updateExistingNode(upload, n, spaceID, uint64(fsize)) } - - defer filelocks.ReleaseLock(lock) //nolint:errcheck + defer func() { + if err := f.Close(); err != nil { + appctx.GetLogger(upload.Ctx).Error().Err(err).Str("nodeid", n.ID).Str("parentid", n.ParentID).Msg("could not close lock") + } + }() if err != nil { return nil, err } @@ -316,7 +319,7 @@ func CreateNodeForUpload(upload *Upload, initAttrs map[string]string) (*node.Nod return n, nil } -func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*flock.Flock, error) { +func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*lockedfile.File, error) { n.ID = uuid.New().String() // create folder structure (if needed) @@ -324,18 +327,24 @@ func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*flock.Flock, erro return nil, err } - if _, err := os.Create(n.InternalPath()); err != nil { + // create and write lock new node metadata + f, err := lockedfile.OpenFile(upload.lu.MetadataBackend().MetadataPath(n.InternalPath()), os.O_RDWR|os.O_CREATE, 0600) + if err != nil { return nil, err } - lock, err := filelocks.AcquireWriteLock(n.InternalPath()) - if err != nil { - // we cannot acquire a lock - we error for safety - return lock, err + if _, ok := upload.lu.MetadataBackend().(metadata.IniBackend); ok { + // for the ini backend we also need to touch the actual node file here. + // it stores the mtime of the resource, which must not change when we update the ini file + h, err := os.OpenFile(n.InternalPath(), os.O_CREATE, 0600) + if err != nil { + return f, err + } + h.Close() } if _, err := node.CheckQuota(n.SpaceRoot, false, 0, fsize); err != nil { - return lock, err + return f, err } // link child name to parent if it is new @@ -343,23 +352,23 @@ func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*flock.Flock, erro link, err := os.Readlink(childNameLink) if err == nil && link != "../"+n.ID { if err := os.Remove(childNameLink); err != nil { - return lock, errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") + return f, errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") } } if errors.Is(err, iofs.ErrNotExist) || link != "../"+n.ID { relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) if err = os.Symlink(relativeNodePath, childNameLink); err != nil { - return lock, errors.Wrap(err, "Decomposedfs: could not symlink child entry") + return f, errors.Wrap(err, "Decomposedfs: could not symlink child entry") } } // on a new file the sizeDiff is the fileSize upload.sizeDiff = int64(fsize) upload.Info.MetaData["sizeDiff"] = strconv.Itoa(int(upload.sizeDiff)) - return lock, nil + return f, nil } -func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) (*flock.Flock, error) { +func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) (*lockedfile.File, error) { old, _ := node.ReadNode(upload.Ctx, upload.lu, spaceID, n.ID, false) if _, err := node.CheckQuota(n.SpaceRoot, true, uint64(old.Blobsize), fsize); err != nil { return nil, err @@ -389,15 +398,15 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint targetPath := n.InternalPath() - lock, err := filelocks.AcquireWriteLock(targetPath) + // write lock existing node before reading treesize or tree time + f, err := lockedfile.OpenFile(upload.lu.MetadataBackend().MetadataPath(targetPath), os.O_RDWR, 0600) if err != nil { - // we cannot acquire a lock - we error for safety return nil, err } // create version node if _, err := os.Create(upload.versionsPath); err != nil { - return lock, err + return f, err } // copy blob metadata to version node @@ -406,13 +415,13 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint attributeName == prefixes.TypeAttr || attributeName == prefixes.BlobIDAttr || attributeName == prefixes.BlobsizeAttr - }, lock); err != nil { - return lock, err + }, f); err != nil { + return f, err } // keep mtime from previous version if err := os.Chtimes(upload.versionsPath, tmtime, tmtime); err != nil { - return lock, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err)) + return f, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err)) } // update mtime of current version @@ -421,7 +430,7 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint return nil, err } - return lock, nil + return f, nil } // lookupNode looks up nodes by path.