From 4339b8ab47599646608f1efaba8450502173683f Mon Sep 17 00:00:00 2001 From: Andre Duffeck Date: Mon, 13 Mar 2023 16:13:35 +0100 Subject: [PATCH] Fix ini metadata backend (#3711) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Use msgpack to serialize cache data instead of json * Use storage-relative paths as the cache key That prevents non-aligned cache keys when multiple providers mount the storage at a different location. * Create dedicated type for node attributes, based on map[string][]byte * add messagepack backend Signed-off-by: Jörn Friedrich Dreyer * Switch msgpack library to a faster, leaner one * Adapt changelog * Cleanup * Fix test * Fix error message * Use messagepack backend in the s3ng acceptance tests * make hound happy Signed-off-by: Jörn Friedrich Dreyer * flip tests Signed-off-by: Jörn Friedrich Dreyer * Do not try to get the tmtime for files being uploaded * Extend the metadata backend to read the attributes from a locked source * Fix comment --------- Signed-off-by: Jörn Friedrich Dreyer Co-authored-by: Jörn Friedrich Dreyer --- changelog/unreleased/replace-ini-backend.md | 5 + go.mod | 3 +- go.sum | 6 +- pkg/storage/cache/cache.go | 8 +- pkg/storage/cache/filemetadata.go | 16 + .../utils/decomposedfs/decomposedfs.go | 8 +- .../decomposedfs_concurrency_test.go | 20 ++ pkg/storage/utils/decomposedfs/grants.go | 6 +- pkg/storage/utils/decomposedfs/grants_test.go | 2 +- .../utils/decomposedfs/lookup/lookup.go | 80 ++--- pkg/storage/utils/decomposedfs/metadata.go | 2 +- .../decomposedfs/metadata/ini_backend.go | 301 ------------------ .../metadata/messagepack_backend.go | 277 ++++++++++++++++ .../utils/decomposedfs/metadata/metadata.go | 29 +- .../decomposedfs/metadata/metadata_test.go | 116 +++---- .../decomposedfs/metadata/xattrs_backend.go | 32 +- pkg/storage/utils/decomposedfs/node/node.go | 159 ++++----- .../utils/decomposedfs/node/node_test.go | 2 +- pkg/storage/utils/decomposedfs/node/xattrs.go | 78 ++++- pkg/storage/utils/decomposedfs/recycle.go | 8 +- pkg/storage/utils/decomposedfs/spaces.go | 88 +++-- .../utils/decomposedfs/testhelpers/helpers.go | 4 +- .../utils/decomposedfs/tree/migrations.go | 2 +- pkg/storage/utils/decomposedfs/tree/tree.go | 103 +++--- .../utils/decomposedfs/tree/tree_test.go | 4 +- .../utils/decomposedfs/upload/processing.go | 78 +++-- .../utils/decomposedfs/upload/upload.go | 8 +- pkg/storage/utils/decomposedfs/upload_test.go | 2 +- .../drone/storage-users-ocis.toml | 2 + .../drone/storage-users-s3ng.toml | 4 +- 30 files changed, 767 insertions(+), 686 deletions(-) create mode 100644 changelog/unreleased/replace-ini-backend.md delete mode 100644 pkg/storage/utils/decomposedfs/metadata/ini_backend.go create mode 100644 pkg/storage/utils/decomposedfs/metadata/messagepack_backend.go diff --git a/changelog/unreleased/replace-ini-backend.md b/changelog/unreleased/replace-ini-backend.md new file mode 100644 index 0000000000..910520645e --- /dev/null +++ b/changelog/unreleased/replace-ini-backend.md @@ -0,0 +1,5 @@ +Bugfix: Replace ini metadata backend by messagepack backend + +We replaced the ini metadata backend by a messagepack backend which is more robust and also uses less resources. + +https://github.com/cs3org/reva/pull/3711 diff --git a/go.mod b/go.mod index 8d2d483baf..60fdc5b49e 100644 --- a/go.mod +++ b/go.mod @@ -64,6 +64,7 @@ require ( github.com/rs/zerolog v1.28.0 github.com/sciencemesh/meshdirectory-web v1.0.4 github.com/sethvargo/go-password v0.2.0 + github.com/shamaton/msgpack/v2 v2.1.1 github.com/stretchr/testify v1.8.1 github.com/studio-b12/gowebdav v0.0.0-20221015232716-17255f2e7423 github.com/test-go/testify v1.1.4 @@ -86,7 +87,6 @@ require ( google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e google.golang.org/grpc v1.50.1 google.golang.org/protobuf v1.28.1 - gopkg.in/ini.v1 v1.67.0 gotest.tools v2.2.0+incompatible ) @@ -204,6 +204,7 @@ require ( golang.org/x/time v0.1.0 // indirect golang.org/x/tools v0.4.0 // indirect google.golang.org/appengine v1.6.7 // indirect + gopkg.in/ini.v1 v1.66.6 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect diff --git a/go.sum b/go.sum index 28c4ff19be..9de019ff69 100644 --- a/go.sum +++ b/go.sum @@ -853,6 +853,8 @@ github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0/go.mod h1:Ad7IjTpvzZO8Fl0vh9AzQ+j/jYZfyp2diGwI8m5q+ns= github.com/sethvargo/go-password v0.2.0 h1:BTDl4CC/gjf/axHMaDQtw507ogrXLci6XRiLc7i/UHI= github.com/sethvargo/go-password v0.2.0/go.mod h1:Ym4Mr9JXLBycr02MFuVQ/0JHidNetSgbzutTr3zsYXE= +github.com/shamaton/msgpack/v2 v2.1.1 h1:gAMxOtVJz93R0EwewwUc8tx30n34aV6BzJuwHE8ogAk= +github.com/shamaton/msgpack/v2 v2.1.1/go.mod h1:aTUEmh31ziGX1Ml7wMPLVY0f4vT3CRsCvZRoSCs+VGg= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 h1:pXY9qYc/MP5zdvqWEUH6SjNiu7VhSjuVFTFiTcphaLU= @@ -1551,8 +1553,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/h2non/gock.v1 v1.0.14/go.mod h1:sX4zAkdYX1TRGJ2JY156cFspQn4yRWn6p9EMdODlynE= gopkg.in/h2non/gock.v1 v1.1.2/go.mod h1:n7UGz/ckNChHiK05rDoiC4MYSunEC/lyaUm2WWaDva0= -gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= -gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI= +gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI= gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/telebot.v3 v3.0.0/go.mod h1:7rExV8/0mDDNu9epSrDm/8j22KLaActH1Tbee6YjzWg= diff --git a/pkg/storage/cache/cache.go b/pkg/storage/cache/cache.go index e4fab668f4..98896ad7f0 100644 --- a/pkg/storage/cache/cache.go +++ b/pkg/storage/cache/cache.go @@ -19,7 +19,6 @@ package cache import ( - "encoding/json" "fmt" "strings" "sync" @@ -32,6 +31,7 @@ import ( redisopts "github.com/go-redis/redis/v8" "github.com/nats-io/nats.go" microetcd "github.com/owncloud/ocis/v2/ocis-pkg/store/etcd" + "github.com/shamaton/msgpack/v2" microstore "go-micro.dev/v4/store" ) @@ -84,6 +84,7 @@ type CreatePersonalSpaceCache interface { // FileMetadataCache handles file metadata type FileMetadataCache interface { Cache + RemoveMetadata(path string) error } // GetStatCache will return an existing StatCache for the given store, nodes, database and table @@ -238,12 +239,13 @@ func (cache cacheStore) PullFromCache(key string, dest interface{}) error { if len(r) == 0 { return fmt.Errorf("not found") } - return json.Unmarshal(r[0].Value, dest) + + return msgpack.Unmarshal(r[0].Value, &dest) } // PushToCache pushes a key and value to the configured database and table of the underlying store func (cache cacheStore) PushToCache(key string, src interface{}) error { - b, err := json.Marshal(src) + b, err := msgpack.Marshal(src) if err != nil { return err } diff --git a/pkg/storage/cache/filemetadata.go b/pkg/storage/cache/filemetadata.go index 0966b40e47..1dc6866a7a 100644 --- a/pkg/storage/cache/filemetadata.go +++ b/pkg/storage/cache/filemetadata.go @@ -19,6 +19,7 @@ package cache import ( + "strings" "time" ) @@ -36,3 +37,18 @@ func NewFileMetadataCache(store string, nodes []string, database, table string, return c } + +// RemoveMetadata removes a reference from the metadata cache +func (c *fileMetadataCache) RemoveMetadata(path string) error { + keys, err := c.List() + if err != nil { + return err + } + + for _, key := range keys { + if strings.HasPrefix(key, path) { + _ = c.Delete(key) + } + } + return nil +} diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 83d3beeab5..1273caeee9 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -107,10 +107,10 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) ( switch o.MetadataBackend { case "xattrs": lu = lookup.New(metadata.XattrsBackend{}, o) - case "ini": - lu = lookup.New(metadata.NewIniBackend(o.FileMetadataCache), o) + case "messagepack": + lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), o) default: - return nil, fmt.Errorf("unknown metadata backend %s, only 'ini' or 'xattrs' (default) supported", o.MetadataBackend) + return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend) } tp := tree.New(o.Root, o.TreeTimeAccounting, o.TreeSizeAccounting, lu, bs) @@ -575,7 +575,7 @@ func (fs *Decomposedfs) CreateDir(ctx context.Context, ref *provider.Reference) if fs.o.TreeTimeAccounting || fs.o.TreeSizeAccounting { // mark the home node as the end of propagation - if err = n.SetXattr(prefixes.PropagationAttr, "1"); err != nil { + if err = n.SetXattrString(prefixes.PropagationAttr, "1"); err != nil { appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("could not mark node to propagate") // FIXME: This does not return an error at all, but results in a severe situation that the diff --git a/pkg/storage/utils/decomposedfs/decomposedfs_concurrency_test.go b/pkg/storage/utils/decomposedfs/decomposedfs_concurrency_test.go index 6a6d8643e0..55999537c3 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs_concurrency_test.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs_concurrency_test.go @@ -112,6 +112,26 @@ var _ = Describe("Decomposed", func() { woFile.Close() Eventually(func() bool { s, _ := states.Load("managedToOpenroFile"); return s.(bool) }).Should(BeTrue()) }) + + It("allows opening rw while an exclusive lock is being held", func() { + states := sync.Map{} + + path, err := os.CreateTemp("", "decomposedfs-lockedfile-test-") + Expect(err).ToNot(HaveOccurred()) + states.Store("managedToOpenrwLockedFile", false) + woFile, err := lockedfile.OpenFile(path.Name(), os.O_WRONLY, 0) + Expect(err).ToNot(HaveOccurred()) + defer woFile.Close() + go func() { + roFile, err := os.OpenFile(path.Name(), os.O_RDWR, 0) + Expect(err).ToNot(HaveOccurred()) + defer roFile.Close() + states.Store("managedToOpenrwLockedFile", true) + }() + + woFile.Close() + Eventually(func() bool { s, _ := states.Load("managedToOpenrwLockedFile"); return s.(bool) }).Should(BeTrue()) + }) }) }) diff --git a/pkg/storage/utils/decomposedfs/grants.go b/pkg/storage/utils/decomposedfs/grants.go index 6a10bafb24..5a86fa7726 100644 --- a/pkg/storage/utils/decomposedfs/grants.go +++ b/pkg/storage/utils/decomposedfs/grants.go @@ -134,7 +134,7 @@ func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference) return nil, errtypes.NotFound(f) } log := appctx.GetLogger(ctx) - var attrs map[string]string + var attrs node.Attributes if attrs, err = grantNode.Xattrs(); err != nil { log.Error().Err(err).Msg("error listing attributes") return nil, err @@ -146,7 +146,7 @@ func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference) var err error var e *ace.ACE principal := k[len(prefixes.GrantPrefix):] - if e, err = ace.Unmarshal(principal, []byte(v)); err != nil { + if e, err = ace.Unmarshal(principal, v); err != nil { log.Error().Err(err).Str("principal", principal).Str("attr", k).Msg("could not unmarshal ace") continue } @@ -312,7 +312,7 @@ func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provide // set the grant e := ace.FromGrant(g) principal, value := e.Marshal() - if err := n.SetXattr(prefixes.GrantPrefix+principal, string(value)); err != nil { + if err := n.SetXattr(prefixes.GrantPrefix+principal, value); err != nil { appctx.GetLogger(ctx).Error().Err(err). Str("principal", principal).Msg("Could not set grant for principal") return err diff --git a/pkg/storage/utils/decomposedfs/grants_test.go b/pkg/storage/utils/decomposedfs/grants_test.go index b5ad54290b..142ff632a8 100644 --- a/pkg/storage/utils/decomposedfs/grants_test.go +++ b/pkg/storage/utils/decomposedfs/grants_test.go @@ -138,7 +138,7 @@ var _ = Describe("Grants", func() { Path: "/dir1", }) Expect(err).ToNot(HaveOccurred()) - attr, err := n.Xattr(prefixes.GrantUserAcePrefix + grant.Grantee.GetUserId().OpaqueId) + attr, err := n.XattrString(prefixes.GrantUserAcePrefix + grant.Grantee.GetUserId().OpaqueId) Expect(err).ToNot(HaveOccurred()) Expect(attr).To(Equal(fmt.Sprintf("\x00t=A:f=:p=rw:c=%s:e=0\n", o.GetOpaqueId()))) // NOTE: this tests ace package }) diff --git a/pkg/storage/utils/decomposedfs/lookup/lookup.go b/pkg/storage/utils/decomposedfs/lookup/lookup.go index e546c0cfe6..fe411c8057 100644 --- a/pkg/storage/utils/decomposedfs/lookup/lookup.go +++ b/pkg/storage/utils/decomposedfs/lookup/lookup.go @@ -23,7 +23,6 @@ import ( "fmt" "os" "path/filepath" - "strconv" "strings" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" @@ -33,9 +32,8 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" - "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" - "github.com/gofrs/flock" "github.com/pkg/errors" + "github.com/rogpeppe/go-internal/lockedfile" ) // Lookup implements transformations from filepath to node and back @@ -60,14 +58,10 @@ func (lu *Lookup) MetadataBackend() metadata.Backend { // ReadBlobSizeAttr reads the blobsize from the xattrs func (lu *Lookup) ReadBlobSizeAttr(path string) (int64, error) { - attr, err := lu.metadataBackend.Get(path, prefixes.BlobsizeAttr) + blobSize, err := lu.metadataBackend.GetInt64(path, prefixes.BlobsizeAttr) if err != nil { return 0, errors.Wrapf(err, "error reading blobsize xattr") } - blobSize, err := strconv.ParseInt(attr, 10, 64) - if err != nil { - return 0, errors.Wrapf(err, "invalid blobsize xattr format") - } return blobSize, nil } @@ -77,22 +71,18 @@ func (lu *Lookup) ReadBlobIDAttr(path string) (string, error) { if err != nil { return "", errors.Wrapf(err, "error reading blobid xattr") } - return attr, nil + return string(attr), nil } // TypeFromPath returns the type of the node at the given path func (lu *Lookup) TypeFromPath(path string) provider.ResourceType { // Try to read from xattrs - typeAttr, err := lu.metadataBackend.Get(path, prefixes.TypeAttr) - t := provider.ResourceType_RESOURCE_TYPE_INVALID + typeAttr, err := lu.metadataBackend.GetInt64(path, prefixes.TypeAttr) if err == nil { - typeInt, err := strconv.ParseInt(typeAttr, 10, 32) - if err != nil { - return t - } - return provider.ResourceType(typeInt) + return provider.ResourceType(int32(typeAttr)) } + t := provider.ResourceType_RESOURCE_TYPE_INVALID // Fall back to checking on disk fi, err := os.Lstat(path) if err != nil { @@ -219,7 +209,7 @@ func (lu *Lookup) WalkPath(ctx context.Context, r *node.Node, p string, followRe if followReferences { if attrBytes, err := r.Xattr(prefixes.ReferenceAttr); err == nil { realNodeID := attrBytes - ref, err := refFromCS3([]byte(realNodeID)) + ref, err := refFromCS3(realNodeID) if err != nil { return nil, err } @@ -277,18 +267,20 @@ func refFromCS3(b []byte) (*provider.Reference, error) { // CopyMetadata copies all extended attributes from source to target. // The optional filter function can be used to filter by attribute name, e.g. by checking a prefix // For the source file, a shared lock is acquired. -// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally +// NOTE: target resource will be write locked! func (lu *Lookup) CopyMetadata(src, target string, filter func(attributeName string) bool) (err error) { - var readLock *flock.Flock - // Acquire a read log on the source node - readLock, err = filelocks.AcquireReadLock(src) + // write lock existing node before reading treesize or tree time + f, err := lockedfile.Open(lu.MetadataBackend().MetadataPath(src)) + if err != nil { + return err + } if err != nil { return errors.Wrap(err, "xattrs: Unable to lock source to read") } defer func() { - rerr := filelocks.ReleaseLock(readLock) + rerr := f.Close() // if err is non nil we do not overwrite that if err == nil { @@ -296,50 +288,32 @@ func (lu *Lookup) CopyMetadata(src, target string, filter func(attributeName str } }() - return lu.CopyMetadataWithSourceLock(src, target, filter, readLock) + return lu.CopyMetadataWithSourceLock(src, target, filter, f) } // CopyMetadataWithSourceLock copies all extended attributes from source to target. // The optional filter function can be used to filter by attribute name, e.g. by checking a prefix -// For the source file, a shared lock is acquired. -// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally -func (lu *Lookup) CopyMetadataWithSourceLock(src, target string, filter func(attributeName string) bool, readLock *flock.Flock) (err error) { +// For the source file, a matching lockedfile is required. +// NOTE: target resource will be write locked! +func (lu *Lookup) CopyMetadataWithSourceLock(sourcePath, targetPath string, filter func(attributeName string) bool, lockedSource *lockedfile.File) (err error) { switch { - case readLock == nil: + case lockedSource == nil: return errors.New("no lock provided") - case readLock.Path() != filelocks.FlockFile(src): + case lockedSource.File.Name() != lu.MetadataBackend().MetadataPath(sourcePath): return errors.New("lockpath does not match filepath") - case !readLock.Locked() && !readLock.RLocked(): // we need either a read or a write lock - return errors.New("not locked") } - // both locks are established. Copy. - var attrNameList []string - if attrNameList, err = lu.metadataBackend.List(src); err != nil { - return errors.Wrap(err, "Can not get xattr listing on src") + attrs, err := lu.metadataBackend.AllWithLockedSource(sourcePath, lockedSource) + if err != nil { + return err } - // error handling: We count errors of reads or writes of xattrs. - // if there were any read or write errors an error is returned. - var ( - xerrs = 0 - xerr error - ) - for idx := range attrNameList { - attrName := attrNameList[idx] + newAttrs := make(map[string][]byte, 0) + for attrName, val := range attrs { if filter == nil || filter(attrName) { - var attrVal string - if attrVal, xerr = lu.metadataBackend.Get(src, attrName); xerr != nil { - xerrs++ - } - if xerr = lu.metadataBackend.Set(target, attrName, attrVal); xerr != nil { - xerrs++ - } + newAttrs[attrName] = val } } - if xerrs > 0 { - err = errors.Wrap(xerr, "failed to copy all xattrs, last error returned") - } - return err + return lu.MetadataBackend().SetMultiple(targetPath, newAttrs, true) } diff --git a/pkg/storage/utils/decomposedfs/metadata.go b/pkg/storage/utils/decomposedfs/metadata.go index 686a4bdf30..31c96fef0a 100644 --- a/pkg/storage/utils/decomposedfs/metadata.go +++ b/pkg/storage/utils/decomposedfs/metadata.go @@ -111,7 +111,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider. } for k, v := range md.Metadata { attrName := prefixes.MetadataPrefix + k - if err = n.SetXattr(attrName, v); err != nil { + if err = n.SetXattrString(attrName, v); err != nil { errs = append(errs, errors.Wrap(err, "Decomposedfs: could not set metadata attribute "+attrName+" to "+k)) } } diff --git a/pkg/storage/utils/decomposedfs/metadata/ini_backend.go b/pkg/storage/utils/decomposedfs/metadata/ini_backend.go deleted file mode 100644 index 6e96a84a75..0000000000 --- a/pkg/storage/utils/decomposedfs/metadata/ini_backend.go +++ /dev/null @@ -1,301 +0,0 @@ -// Copyright 2018-2023 CERN -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// In applying this license, CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -package metadata - -import ( - "encoding/base64" - "io" - "os" - "strconv" - "strings" - "time" - - "github.com/cs3org/reva/v2/pkg/storage/cache" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" - "github.com/pkg/xattr" - "github.com/rogpeppe/go-internal/lockedfile" - "gopkg.in/ini.v1" -) - -// IniBackend persists the attributes in INI format inside the file -type IniBackend struct { - metaCache cache.FileMetadataCache -} - -type readWriteCloseSeekTruncater interface { - io.ReadWriteCloser - io.Seeker - Truncate(int64) error -} - -var encodedPrefixes = []string{prefixes.ChecksumPrefix, prefixes.MetadataPrefix, prefixes.GrantPrefix} - -// NewIniBackend returns a new IniBackend instance -func NewIniBackend(o options.CacheOptions) IniBackend { - return IniBackend{ - metaCache: cache.GetFileMetadataCache(o.CacheStore, o.CacheNodes, o.CacheDatabase, "filemetadata", 24*time.Hour), - } -} - -// All reads all extended attributes for a node -func (b IniBackend) All(path string) (map[string]string, error) { - path = b.MetadataPath(path) - - return b.loadMeta(path) -} - -// Get an extended attribute value for the given key -func (b IniBackend) Get(path, key string) (string, error) { - path = b.MetadataPath(path) - - attribs, err := b.loadMeta(path) - if err != nil { - return "", err - } - val, ok := attribs[key] - if !ok { - return "", &xattr.Error{Op: "ini.get", Path: path, Name: key, Err: xattr.ENOATTR} - } - return val, nil -} - -// GetInt64 reads a string as int64 from the xattrs -func (b IniBackend) GetInt64(path, key string) (int64, error) { - path = b.MetadataPath(path) - - attribs, err := b.loadMeta(path) - if err != nil { - return 0, err - } - val, ok := attribs[key] - if !ok { - return 0, &xattr.Error{Op: "ini.get", Path: path, Name: key, Err: xattr.ENOATTR} - } - i, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return 0, err - } - return i, nil -} - -// List retrieves a list of names of extended attributes associated with the -// given path in the file system. -func (b IniBackend) List(path string) ([]string, error) { - path = b.MetadataPath(path) - - attribs, err := b.loadMeta(path) - if err != nil { - return nil, err - } - keys := []string{} - for k := range attribs { - keys = append(keys, k) - } - return keys, nil -} - -// Set sets one attribute for the given path -func (b IniBackend) Set(path, key, val string) error { - return b.SetMultiple(path, map[string]string{key: val}, true) -} - -// SetMultiple sets a set of attribute for the given path -func (b IniBackend) SetMultiple(path string, attribs map[string]string, acquireLock bool) error { - return b.saveIni(path, attribs, nil, acquireLock) -} - -// Remove an extended attribute key -func (b IniBackend) Remove(path, key string) error { - return b.saveIni(path, nil, []string{key}, true) -} - -func (b IniBackend) saveIni(path string, setAttribs map[string]string, deleteAttribs []string, acquireLock bool) error { - var ( - f readWriteCloseSeekTruncater - err error - ) - path = b.MetadataPath(path) - if acquireLock { - f, err = lockedfile.OpenFile(path, os.O_RDWR|os.O_CREATE, 0600) - } else { - f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0600) - } - if err != nil { - return err - } - defer f.Close() - - // Read current state - iniBytes, err := io.ReadAll(f) - if err != nil { - return err - } - iniFile, err := ini.Load(iniBytes) - if err != nil { - return err - } - - // Prepare new metadata - iniAttribs, err := decodedAttribs(iniFile) - if err != nil { - return err - } - for key, val := range setAttribs { - iniAttribs[key] = val - } - for _, key := range deleteAttribs { - delete(iniAttribs, key) - } - - // Truncate file - _, err = f.Seek(0, io.SeekStart) - if err != nil { - return err - } - err = f.Truncate(0) - if err != nil { - return err - } - - // Write new metadata to file - ini, err := ini.Load([]byte{}) - if err != nil { - return err - } - for key, val := range iniAttribs { - for _, prefix := range encodedPrefixes { - if strings.HasPrefix(key, prefix) { - val = base64.StdEncoding.EncodeToString([]byte(val)) - break - } - } - ini.Section("").Key(key).SetValue(val) - } - _, err = ini.WriteTo(f) - if err != nil { - return err - } - - return b.metaCache.PushToCache(path, iniAttribs) -} - -func (b IniBackend) loadMeta(path string) (map[string]string, error) { - var attribs map[string]string - err := b.metaCache.PullFromCache(path, &attribs) - if err == nil { - return attribs, err - } - - var iniFile *ini.File - f, err := os.ReadFile(path) - length := len(f) - - // Try to read the file without getting a lock first. We will either - // get the old or the new state or an empty byte array when the file - // was just truncated by a writer. - if err == nil && length > 0 { - iniFile, err = ini.Load(f) - if err != nil { - return nil, err - } - } else { - if os.IsNotExist(err) { - // some of the caller rely on ENOTEXISTS to be returned when the - // actual file (not the metafile) does not exist in order to - // determine whether a node exists or not -> stat the actual node - _, err := os.Stat(strings.TrimSuffix(path, ".ini")) - if err != nil { - return nil, err - } - return attribs, nil // no attributes set yet - } - - // // No cached entry found. Read from storage and store in cache - lockedFile, err := lockedfile.Open(path) - if err != nil { - return nil, err - } - defer lockedFile.Close() - - iniFile, err = ini.Load(lockedFile) - if err != nil { - return nil, err - } - } - - attribs, err = decodedAttribs(iniFile) - if err != nil { - return nil, err - } - - err = b.metaCache.PushToCache(path, attribs) - if err != nil { - return nil, err - } - - return attribs, nil -} - -// IsMetaFile returns whether the given path represents a meta file -func (IniBackend) IsMetaFile(path string) bool { return strings.HasSuffix(path, ".ini") } - -// Purge purges the data of a given path -func (b IniBackend) Purge(path string) error { - if err := b.metaCache.Delete(path); err != nil { - return err - } - return os.Remove(b.MetadataPath(path)) -} - -// Rename moves the data for a given path to a new path -func (b IniBackend) Rename(oldPath, newPath string) error { - data := map[string]string{} - _ = b.metaCache.PullFromCache(oldPath, &data) - err := b.metaCache.Delete(oldPath) - if err != nil { - return err - } - err = b.metaCache.PushToCache(newPath, data) - if err != nil { - return err - } - - return os.Rename(b.MetadataPath(oldPath), b.MetadataPath(newPath)) -} - -// MetadataPath returns the path of the file holding the metadata for the given path -func (IniBackend) MetadataPath(path string) string { return path + ".ini" } - -func decodedAttribs(iniFile *ini.File) (map[string]string, error) { - attribs := iniFile.Section("").KeysHash() - for key, val := range attribs { - for _, prefix := range encodedPrefixes { - if strings.HasPrefix(key, prefix) { - valBytes, err := base64.StdEncoding.DecodeString(val) - if err != nil { - return nil, err - } - attribs[key] = string(valBytes) - break - } - } - } - return attribs, nil -} diff --git a/pkg/storage/utils/decomposedfs/metadata/messagepack_backend.go b/pkg/storage/utils/decomposedfs/metadata/messagepack_backend.go new file mode 100644 index 0000000000..85f08c7ed3 --- /dev/null +++ b/pkg/storage/utils/decomposedfs/metadata/messagepack_backend.go @@ -0,0 +1,277 @@ +// Copyright 2018-2023 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package metadata + +import ( + "io" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/cs3org/reva/v2/pkg/storage/cache" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/pkg/xattr" + "github.com/rogpeppe/go-internal/lockedfile" + "github.com/shamaton/msgpack/v2" +) + +// MessagePackBackend persists the attributes in messagepack format inside the file +type MessagePackBackend struct { + rootPath string + metaCache cache.FileMetadataCache +} + +type readWriteCloseSeekTruncater interface { + io.ReadWriteCloser + io.Seeker + Truncate(int64) error +} + +// NewMessagePackBackend returns a new MessagePackBackend instance +func NewMessagePackBackend(rootPath string, o options.CacheOptions) MessagePackBackend { + return MessagePackBackend{ + rootPath: filepath.Clean(rootPath), + metaCache: cache.GetFileMetadataCache(o.CacheStore, o.CacheNodes, o.CacheDatabase, "filemetadata", 24*time.Hour), + } +} + +// All reads all extended attributes for a node +func (b MessagePackBackend) All(path string) (map[string][]byte, error) { + path = b.MetadataPath(path) + + return b.loadAttributes(path, nil) +} + +// Get an extended attribute value for the given key +func (b MessagePackBackend) Get(path, key string) ([]byte, error) { + path = b.MetadataPath(path) + + attribs, err := b.loadAttributes(path, nil) + if err != nil { + return []byte{}, err + } + val, ok := attribs[key] + if !ok { + return []byte{}, &xattr.Error{Op: "mpk.get", Path: path, Name: key, Err: xattr.ENOATTR} + } + return val, nil +} + +// GetInt64 reads a string as int64 from the xattrs +func (b MessagePackBackend) GetInt64(path, key string) (int64, error) { + path = b.MetadataPath(path) + + attribs, err := b.loadAttributes(path, nil) + if err != nil { + return 0, err + } + val, ok := attribs[key] + if !ok { + return 0, &xattr.Error{Op: "mpk.get", Path: path, Name: key, Err: xattr.ENOATTR} + } + i, err := strconv.ParseInt(string(val), 10, 64) + if err != nil { + return 0, err + } + return i, nil +} + +// List retrieves a list of names of extended attributes associated with the +// given path in the file system. +func (b MessagePackBackend) List(path string) ([]string, error) { + path = b.MetadataPath(path) + + attribs, err := b.loadAttributes(path, nil) + if err != nil { + return nil, err + } + keys := []string{} + for k := range attribs { + keys = append(keys, k) + } + return keys, nil +} + +// Set sets one attribute for the given path +func (b MessagePackBackend) Set(path, key string, val []byte) error { + return b.SetMultiple(path, map[string][]byte{key: val}, true) +} + +// SetMultiple sets a set of attribute for the given path +func (b MessagePackBackend) SetMultiple(path string, attribs map[string][]byte, acquireLock bool) error { + return b.saveAttributes(path, attribs, nil, acquireLock) +} + +// Remove an extended attribute key +func (b MessagePackBackend) Remove(path, key string) error { + return b.saveAttributes(path, nil, []string{key}, true) +} + +// AllWithLockedSource reads all extended attributes from the given reader (if possible). +// The path argument is used for storing the data in the cache +func (b MessagePackBackend) AllWithLockedSource(path string, source io.Reader) (map[string][]byte, error) { + path = b.MetadataPath(path) + return b.loadAttributes(path, source) +} + +func (b MessagePackBackend) saveAttributes(path string, setAttribs map[string][]byte, deleteAttribs []string, acquireLock bool) error { + var ( + f readWriteCloseSeekTruncater + err error + ) + path = b.MetadataPath(path) + if acquireLock { + f, err = lockedfile.OpenFile(path, os.O_RDWR|os.O_CREATE, 0600) + } else { + f, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0600) + } + if err != nil { + return err + } + defer f.Close() + + // Invalidate cache early + _ = b.metaCache.RemoveMetadata(path) + + // Read current state + msgBytes, err := io.ReadAll(f) + if err != nil { + return err + } + attribs := map[string][]byte{} + if len(msgBytes) > 0 { + err = msgpack.Unmarshal(msgBytes, &attribs) + if err != nil { + return err + } + } + + // set new metadata + for key, val := range setAttribs { + attribs[key] = val + } + for _, key := range deleteAttribs { + delete(attribs, key) + } + + // Truncate file + _, err = f.Seek(0, io.SeekStart) + if err != nil { + return err + } + err = f.Truncate(0) + if err != nil { + return err + } + + // Write new metadata to file + d, err := msgpack.Marshal(attribs) + if err != nil { + return err + } + _, err = f.Write(d) + if err != nil { + return err + } + + return b.metaCache.PushToCache(b.cacheKey(path), attribs) +} + +func (b MessagePackBackend) loadAttributes(path string, source io.Reader) (map[string][]byte, error) { + attribs := map[string][]byte{} + err := b.metaCache.PullFromCache(b.cacheKey(path), &attribs) + if err == nil { + return attribs, err + } + + if source == nil { + source, err = lockedfile.Open(path) + // // No cached entry found. Read from storage and store in cache + if err != nil { + if os.IsNotExist(err) { + // some of the caller rely on ENOTEXISTS to be returned when the + // actual file (not the metafile) does not exist in order to + // determine whether a node exists or not -> stat the actual node + _, err := os.Stat(strings.TrimSuffix(path, ".mpk")) + if err != nil { + return nil, err + } + return attribs, nil // no attributes set yet + } + } + defer source.(*lockedfile.File).Close() + } + + msgBytes, err := io.ReadAll(source) + if err != nil { + return nil, err + } + if len(msgBytes) > 0 { + err = msgpack.Unmarshal(msgBytes, &attribs) + if err != nil { + return nil, err + } + } + + err = b.metaCache.PushToCache(b.cacheKey(path), attribs) + if err != nil { + return nil, err + } + + return attribs, nil +} + +// IsMetaFile returns whether the given path represents a meta file +func (MessagePackBackend) IsMetaFile(path string) bool { return strings.HasSuffix(path, ".mpk") } + +// Purge purges the data of a given path +func (b MessagePackBackend) Purge(path string) error { + if err := b.metaCache.RemoveMetadata(b.cacheKey(path)); err != nil { + return err + } + return os.Remove(b.MetadataPath(path)) +} + +// Rename moves the data for a given path to a new path +func (b MessagePackBackend) Rename(oldPath, newPath string) error { + data := map[string]string{} + _ = b.metaCache.PullFromCache(b.cacheKey(oldPath), &data) + err := b.metaCache.RemoveMetadata(b.cacheKey(oldPath)) + if err != nil { + return err + } + err = b.metaCache.PushToCache(b.cacheKey(newPath), data) + if err != nil { + return err + } + + return os.Rename(b.MetadataPath(oldPath), b.MetadataPath(newPath)) +} + +// MetadataPath returns the path of the file holding the metadata for the given path +func (MessagePackBackend) MetadataPath(path string) string { return path + ".mpk" } + +func (b MessagePackBackend) cacheKey(path string) string { + // rootPath is guaranteed to have no trailing slash + // the cache key shouldn't begin with a slash as some stores drop it which can cause + // confusion + return strings.TrimPrefix(path, b.rootPath+"/") +} diff --git a/pkg/storage/utils/decomposedfs/metadata/metadata.go b/pkg/storage/utils/decomposedfs/metadata/metadata.go index 6507ee01e3..23e2598441 100644 --- a/pkg/storage/utils/decomposedfs/metadata/metadata.go +++ b/pkg/storage/utils/decomposedfs/metadata/metadata.go @@ -18,34 +18,39 @@ package metadata -import "errors" +import ( + "errors" + "io" +) var errUnconfiguredError = errors.New("no metadata backend configured. Bailing out") // Backend defines the interface for file attribute backends type Backend interface { - All(path string) (map[string]string, error) - Get(path, key string) (string, error) + All(path string) (map[string][]byte, error) + Get(path, key string) ([]byte, error) GetInt64(path, key string) (int64, error) List(path string) (attribs []string, err error) - Set(path, key, val string) error - SetMultiple(path string, attribs map[string]string, acquireLock bool) error + Set(path, key string, val []byte) error + SetMultiple(path string, attribs map[string][]byte, acquireLock bool) error Remove(path, key string) error Purge(path string) error Rename(oldPath, newPath string) error IsMetaFile(path string) bool MetadataPath(path string) string + + AllWithLockedSource(path string, source io.Reader) (map[string][]byte, error) } // NullBackend is the default stub backend, used to enforce the configuration of a proper backend type NullBackend struct{} // All reads all extended attributes for a node -func (NullBackend) All(path string) (map[string]string, error) { return nil, errUnconfiguredError } +func (NullBackend) All(path string) (map[string][]byte, error) { return nil, errUnconfiguredError } // Get an extended attribute value for the given key -func (NullBackend) Get(path, key string) (string, error) { return "", errUnconfiguredError } +func (NullBackend) Get(path, key string) ([]byte, error) { return []byte{}, errUnconfiguredError } // GetInt64 reads a string as int64 from the xattrs func (NullBackend) GetInt64(path, key string) (int64, error) { return 0, errUnconfiguredError } @@ -55,10 +60,10 @@ func (NullBackend) GetInt64(path, key string) (int64, error) { return 0, errUnco func (NullBackend) List(path string) ([]string, error) { return nil, errUnconfiguredError } // Set sets one attribute for the given path -func (NullBackend) Set(path string, key string, val string) error { return errUnconfiguredError } +func (NullBackend) Set(path string, key string, val []byte) error { return errUnconfiguredError } // SetMultiple sets a set of attribute for the given path -func (NullBackend) SetMultiple(path string, attribs map[string]string, acquireLock bool) error { +func (NullBackend) SetMultiple(path string, attribs map[string][]byte, acquireLock bool) error { return errUnconfiguredError } @@ -76,3 +81,9 @@ func (NullBackend) Rename(oldPath, newPath string) error { return errUnconfigure // MetadataPath returns the path of the file holding the metadata for the given path func (NullBackend) MetadataPath(path string) string { return "" } + +// AllWithLockedSource reads all extended attributes from the given reader +// The path argument is used for storing the data in the cache +func (NullBackend) AllWithLockedSource(path string, source io.Reader) (map[string][]byte, error) { + return nil, errUnconfiguredError +} diff --git a/pkg/storage/utils/decomposedfs/metadata/metadata_test.go b/pkg/storage/utils/decomposedfs/metadata/metadata_test.go index a3459b14f9..b889b70ea3 100644 --- a/pkg/storage/utils/decomposedfs/metadata/metadata_test.go +++ b/pkg/storage/utils/decomposedfs/metadata/metadata_test.go @@ -21,7 +21,6 @@ package metadata_test import ( "os" "path" - "strings" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" @@ -57,135 +56,105 @@ var _ = Describe("Backend", func() { } }) - Describe("IniBackend", func() { + Describe("MessagePackBackend", func() { BeforeEach(func() { - backend = metadata.NewIniBackend(options.CacheOptions{}) + backend = metadata.NewMessagePackBackend(tmpdir, options.CacheOptions{}) }) Describe("Set", func() { It("sets an attribute", func() { - err := backend.Set(file, "foo", "bar") + data := []byte(`bar\`) + err := backend.Set(file, "foo", data) Expect(err).ToNot(HaveOccurred()) - content, err := os.ReadFile(metafile) + readData, err := backend.Get(file, "foo") Expect(err).ToNot(HaveOccurred()) - Expect(string(content)).To(Equal("foo = bar\n")) + Expect(readData).To(Equal(data)) }) - It("updates an attribute", func() { - err := backend.Set(file, "foo", "bar") - Expect(err).ToNot(HaveOccurred()) - err = backend.Set(file, "foo", "baz") - Expect(err).ToNot(HaveOccurred()) - - content, err := os.ReadFile(metafile) - Expect(err).ToNot(HaveOccurred()) - Expect(string(content)).To(Equal("foo = baz\n")) - }) - - It("encodes where needed", func() { - err := backend.Set(file, "user.ocis.cs.foo", "bar") + It("handles funny strings", func() { + data := []byte(`bar\`) + err := backend.Set(file, "foo", data) Expect(err).ToNot(HaveOccurred()) - content, err := os.ReadFile(metafile) + readData, err := backend.Get(file, "foo") Expect(err).ToNot(HaveOccurred()) - Expect(string(content)).To(Equal("user.ocis.cs.foo = YmFy\n")) + Expect(readData).To(Equal(data)) }) - It("doesn't encode already encoded attributes", func() { - err := backend.Set(file, "user.ocis.cs.foo", "bar") - Expect(err).ToNot(HaveOccurred()) - - content, err := os.ReadFile(metafile) + It("updates an attribute", func() { + err := backend.Set(file, "foo", []byte("bar")) Expect(err).ToNot(HaveOccurred()) - Expect(string(content)).To(Equal("user.ocis.cs.foo = YmFy\n")) - - err = backend.Set(file, "user.something", "doesn'tmatter") + err = backend.Set(file, "foo", []byte("baz")) Expect(err).ToNot(HaveOccurred()) - content, err = os.ReadFile(metafile) + readData, err := backend.Get(file, "foo") Expect(err).ToNot(HaveOccurred()) - Expect(string(content)).To(ContainSubstring("user.ocis.cs.foo = YmFy\n")) + Expect(readData).To(Equal([]byte("baz"))) }) It("sets an empty attribute", func() { _, err := backend.Get(file, "foo") Expect(err).To(HaveOccurred()) - err = backend.Set(file, "foo", "") + err = backend.Set(file, "foo", []byte{}) Expect(err).ToNot(HaveOccurred()) v, err := backend.Get(file, "foo") Expect(err).ToNot(HaveOccurred()) - Expect(v).To(Equal("")) + Expect(v).To(Equal([]byte{})) }) }) Describe("SetMultiple", func() { It("sets attributes", func() { - err := backend.SetMultiple(file, map[string]string{"foo": "bar", "baz": "qux"}, true) + data := map[string][]byte{"foo": []byte("bar"), "baz": []byte("qux")} + err := backend.SetMultiple(file, data, true) Expect(err).ToNot(HaveOccurred()) - content, err := os.ReadFile(metafile) + readData, err := backend.All(file) Expect(err).ToNot(HaveOccurred()) - lines := strings.Split(strings.Trim(string(content), "\n"), "\n") - Expect(lines).To(ConsistOf("foo = bar", "baz = qux")) + Expect(readData).To(Equal(data)) }) It("updates an attribute", func() { - err := backend.Set(file, "foo", "bar") - Expect(err).ToNot(HaveOccurred()) - err = backend.SetMultiple(file, map[string]string{"foo": "bar", "baz": "qux"}, true) - Expect(err).ToNot(HaveOccurred()) + err := backend.Set(file, "foo", []byte("something")) - content, err := os.ReadFile(metafile) + data := map[string][]byte{"foo": []byte("bar"), "baz": []byte("qux")} Expect(err).ToNot(HaveOccurred()) - lines := strings.Split(strings.Trim(string(content), "\n"), "\n") - Expect(lines).To(ConsistOf("foo = bar", "baz = qux")) - }) - - It("encodes where needed", func() { - err := backend.SetMultiple(file, map[string]string{ - "user.ocis.something.foo": "bar", - "user.ocis.cs.foo": "bar", - "user.ocis.md.foo": "bar", - "user.ocis.grant.foo": "bar", - }, true) + err = backend.SetMultiple(file, data, true) Expect(err).ToNot(HaveOccurred()) - content, err := os.ReadFile(metafile) + readData, err := backend.All(file) Expect(err).ToNot(HaveOccurred()) - expected := []string{ - "user.ocis.something.foo=bar", - "user.ocis.cs.foo=YmFy", - "user.ocis.md.foo=YmFy", - "user.ocis.grant.foo=YmFy"} - Expect(strings.Split(strings.ReplaceAll(strings.Trim(string(content), "\n"), " ", ""), "\n")).To(ConsistOf(expected)) + Expect(readData).To(Equal(data)) }) }) Describe("All", func() { It("returns the entries", func() { - err := os.WriteFile(metafile, []byte("foo=123\nbar=baz"), 0600) + data := map[string][]byte{"foo": []byte("123"), "bar": []byte("baz")} + err := backend.SetMultiple(file, data, true) Expect(err).ToNot(HaveOccurred()) v, err := backend.All(file) Expect(err).ToNot(HaveOccurred()) Expect(len(v)).To(Equal(2)) - Expect(v["foo"]).To(Equal("123")) - Expect(v["bar"]).To(Equal("baz")) + Expect(v["foo"]).To(Equal([]byte("123"))) + Expect(v["bar"]).To(Equal([]byte("baz"))) }) It("returns an empty map", func() { v, err := backend.All(file) Expect(err).ToNot(HaveOccurred()) - Expect(v).To(Equal(map[string]string{})) + Expect(v).To(Equal(map[string][]byte{})) }) }) Describe("List", func() { It("returns the entries", func() { - err := os.WriteFile(metafile, []byte("foo = 123\nbar = baz"), 0600) + data := map[string][]byte{"foo": []byte("123"), "bar": []byte("baz")} + err := backend.SetMultiple(file, data, true) Expect(err).ToNot(HaveOccurred()) v, err := backend.List(file) @@ -202,12 +171,13 @@ var _ = Describe("Backend", func() { Describe("Get", func() { It("returns the attribute", func() { - err := os.WriteFile(metafile, []byte("foo = \"bar\"\n"), 0600) + data := map[string][]byte{"foo": []byte("bar")} + err := backend.SetMultiple(file, data, true) Expect(err).ToNot(HaveOccurred()) v, err := backend.Get(file, "foo") Expect(err).ToNot(HaveOccurred()) - Expect(v).To(Equal("bar")) + Expect(v).To(Equal([]byte("bar"))) }) It("returns an error on unknown attributes", func() { @@ -218,7 +188,8 @@ var _ = Describe("Backend", func() { Describe("GetInt64", func() { It("returns the attribute", func() { - err := os.WriteFile(metafile, []byte("foo=123\n"), 0600) + data := map[string][]byte{"foo": []byte("123")} + err := backend.SetMultiple(file, data, true) Expect(err).ToNot(HaveOccurred()) v, err := backend.GetInt64(file, "foo") @@ -232,14 +203,15 @@ var _ = Describe("Backend", func() { }) }) - Describe("Get", func() { + Describe("Remove", func() { It("deletes an attribute", func() { - err := os.WriteFile(metafile, []byte("foo=bar\n"), 0600) + data := map[string][]byte{"foo": []byte("bar")} + err := backend.SetMultiple(file, data, true) Expect(err).ToNot(HaveOccurred()) v, err := backend.Get(file, "foo") Expect(err).ToNot(HaveOccurred()) - Expect(v).To(Equal("bar")) + Expect(v).To(Equal([]byte("bar"))) err = backend.Remove(file, "foo") Expect(err).ToNot(HaveOccurred()) @@ -251,7 +223,7 @@ var _ = Describe("Backend", func() { Describe("IsMetaFile", func() { It("returns true", func() { - Expect(backend.IsMetaFile("foo.ini")).To(BeTrue()) + Expect(backend.IsMetaFile("foo.mpk")).To(BeTrue()) }) It("returns false", func() { diff --git a/pkg/storage/utils/decomposedfs/metadata/xattrs_backend.go b/pkg/storage/utils/decomposedfs/metadata/xattrs_backend.go index 88c99083e7..a13ec2dc02 100644 --- a/pkg/storage/utils/decomposedfs/metadata/xattrs_backend.go +++ b/pkg/storage/utils/decomposedfs/metadata/xattrs_backend.go @@ -19,6 +19,7 @@ package metadata import ( + "io" "os" "path/filepath" "strconv" @@ -35,13 +36,8 @@ type XattrsBackend struct{} // Get an extended attribute value for the given key // No file locking is involved here as reading a single xattr is // considered to be atomic. -func (b XattrsBackend) Get(filePath, key string) (string, error) { - v, err := xattr.Get(filePath, key) - if err != nil { - return "", err - } - val := string(v) - return val, nil +func (b XattrsBackend) Get(filePath, key string) ([]byte, error) { + return xattr.Get(filePath, key) } // GetInt64 reads a string as int64 from the xattrs @@ -50,7 +46,7 @@ func (b XattrsBackend) GetInt64(filePath, key string) (int64, error) { if err != nil { return 0, err } - v, err := strconv.ParseInt(attr, 10, 64) + v, err := strconv.ParseInt(string(attr), 10, 64) if err != nil { return 0, err } @@ -76,7 +72,7 @@ func (XattrsBackend) List(filePath string) (attribs []string, err error) { // All reads all extended attributes for a node, protected by a // shared file lock -func (b XattrsBackend) All(filePath string) (attribs map[string]string, err error) { +func (b XattrsBackend) All(filePath string) (attribs map[string][]byte, err error) { attrNames, err := b.List(filePath) if err != nil { @@ -89,13 +85,13 @@ func (b XattrsBackend) All(filePath string) (attribs map[string]string, err erro ) // error handling: Count if there are errors while reading all attribs. // if there were any, return an error. - attribs = make(map[string]string, len(attrNames)) + attribs = make(map[string][]byte, len(attrNames)) for _, name := range attrNames { var val []byte if val, xerr = xattr.Get(filePath, name); xerr != nil { xerrs++ } else { - attribs[name] = string(val) + attribs[name] = val } } @@ -107,12 +103,12 @@ func (b XattrsBackend) All(filePath string) (attribs map[string]string, err erro } // Set sets one attribute for the given path -func (b XattrsBackend) Set(path string, key string, val string) (err error) { - return b.SetMultiple(path, map[string]string{key: val}, true) +func (b XattrsBackend) Set(path string, key string, val []byte) (err error) { + return b.SetMultiple(path, map[string][]byte{key: val}, true) } // SetMultiple sets a set of attribute for the given path -func (XattrsBackend) SetMultiple(path string, attribs map[string]string, acquireLock bool) (err error) { +func (XattrsBackend) SetMultiple(path string, attribs map[string][]byte, acquireLock bool) (err error) { if acquireLock { err := os.MkdirAll(filepath.Dir(path), 0600) if err != nil { @@ -132,7 +128,7 @@ func (XattrsBackend) SetMultiple(path string, attribs map[string]string, acquire xerr error ) for key, val := range attribs { - if xerr = xattr.Set(path, key, []byte(val)); xerr != nil { + if xerr = xattr.Set(path, key, val); xerr != nil { // log xerrs++ } @@ -171,3 +167,9 @@ func cleanupLockfile(f *lockedfile.File) { _ = f.Close() _ = os.Remove(f.Name()) } + +// AllWithLockedSource reads all extended attributes from the given reader. +// The path argument is used for storing the data in the cache +func (b XattrsBackend) AllWithLockedSource(path string, _ io.Reader) (map[string][]byte, error) { + return b.All(path) +} diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 23f671eb12..2306edd85c 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -87,7 +87,7 @@ type Node struct { SpaceRoot *Node lu PathLookup - xattrsCache map[string]string + xattrsCache map[string][]byte nodeType *provider.ResourceType } @@ -128,13 +128,9 @@ func (n *Node) Type() provider.ResourceType { t := provider.ResourceType_RESOURCE_TYPE_INVALID // Try to read from xattrs - typeAttr, err := n.Xattr(prefixes.TypeAttr) + typeAttr, err := n.XattrInt32(prefixes.TypeAttr) if err == nil { - typeInt, err := strconv.ParseInt(typeAttr, 10, 32) - if err != nil { - return t - } - t = provider.ResourceType(typeInt) + t = provider.ResourceType(typeAttr) n.nodeType = &t return t } @@ -172,9 +168,10 @@ func (n *Node) SetType(t provider.ResourceType) { func (n *Node) ChangeOwner(new *userpb.UserId) (err error) { n.SpaceRoot.owner = new - var attribs = map[string]string{prefixes.OwnerIDAttr: new.OpaqueId, - prefixes.OwnerIDPAttr: new.Idp, - prefixes.OwnerTypeAttr: utils.UserTypeToString(new.Type)} + attribs := Attributes{} + attribs.SetString(prefixes.OwnerIDAttr, new.OpaqueId) + attribs.SetString(prefixes.OwnerIDPAttr, new.Idp) + attribs.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(new.Type)) if err := n.SpaceRoot.SetXattrs(attribs, true); err != nil { return err @@ -184,14 +181,13 @@ func (n *Node) ChangeOwner(new *userpb.UserId) (err error) { } // WriteAllNodeMetadata writes the Node metadata to disk -func (n *Node) WriteAllNodeMetadata() (err error) { - attribs := make(map[string]string) - - attribs[prefixes.TypeAttr] = strconv.FormatInt(int64(n.Type()), 10) - attribs[prefixes.ParentidAttr] = n.ParentID - attribs[prefixes.NameAttr] = n.Name - attribs[prefixes.BlobIDAttr] = n.BlobID - attribs[prefixes.BlobsizeAttr] = strconv.FormatInt(n.Blobsize, 10) +func (n *Node) WriteAllNodeMetadata(ctx context.Context) (err error) { + attribs := Attributes{} + attribs.SetInt64(prefixes.TypeAttr, int64(n.Type())) + attribs.SetString(prefixes.ParentidAttr, n.ParentID) + attribs.SetString(prefixes.NameAttr, n.Name) + attribs.SetString(prefixes.BlobIDAttr, n.BlobID) + attribs.SetInt64(prefixes.BlobsizeAttr, n.Blobsize) return n.SetXattrs(attribs, true) } @@ -199,11 +195,12 @@ func (n *Node) WriteAllNodeMetadata() (err error) { // WriteOwner writes the space owner func (n *Node) WriteOwner(owner *userpb.UserId) error { n.SpaceRoot.owner = owner - attribs := map[string]string{ - prefixes.OwnerIDAttr: owner.OpaqueId, - prefixes.OwnerIDPAttr: owner.Idp, - prefixes.OwnerTypeAttr: utils.UserTypeToString(owner.Type), - } + + attribs := Attributes{} + attribs.SetString(prefixes.OwnerIDAttr, owner.OpaqueId) + attribs.SetString(prefixes.OwnerIDPAttr, owner.Idp) + attribs.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(owner.Type)) + if err := n.SpaceRoot.SetXattrs(attribs, true); err != nil { return err } @@ -254,7 +251,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis r.Exists = true // lookup name in extended attributes - r.Name, err = r.Xattr(prefixes.NameAttr) + r.Name, err = r.XattrString(prefixes.NameAttr) if err != nil { return nil, err } @@ -312,9 +309,13 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis } n.Exists = true - n.Name = attrs[prefixes.NameAttr] - n.ParentID = attrs[prefixes.ParentidAttr] + n.Name = attrs.String(prefixes.NameAttr) + n.ParentID = attrs.String(prefixes.ParentidAttr) if n.ParentID == "" { + d, _ := os.ReadFile(lu.MetadataBackend().MetadataPath(n.InternalPath())) + if _, ok := lu.MetadataBackend().(metadata.MessagePackBackend); ok { + appctx.GetLogger(ctx).Error().Str("nodeid", n.ID).Interface("attrs", attrs).Bytes("messagepack", d).Msg("missing parent id") + } return nil, errtypes.InternalError("Missing parent ID on node") } // TODO why do we stat the parent? to determine if the current node is in the trash we would need to traverse all parents... @@ -348,8 +349,8 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis } if revisionSuffix == "" { - n.BlobID = attrs[prefixes.BlobIDAttr] - blobSize, err := strconv.ParseInt(attrs[prefixes.BlobsizeAttr], 10, 64) + n.BlobID = attrs.String(prefixes.BlobIDAttr) + blobSize, err := attrs.Int64(prefixes.BlobsizeAttr) if err != nil { return nil, err } @@ -438,8 +439,8 @@ func (n *Node) Parent() (p *Node, err error) { } // lookup name and parent id in extended attributes - p.ParentID, _ = p.Xattr(prefixes.ParentidAttr) - p.Name, _ = p.Xattr(prefixes.NameAttr) + p.ParentID, _ = p.XattrString(prefixes.ParentidAttr) + p.Name, _ = p.XattrString(prefixes.NameAttr) // check node exists if _, err := os.Stat(p.InternalPath()); err == nil { @@ -462,7 +463,7 @@ func (n *Node) readOwner() (*userpb.UserId, error) { var attr string var err error // lookup ID in extended attributes - attr, err = n.SpaceRoot.Xattr(prefixes.OwnerIDAttr) + attr, err = n.SpaceRoot.XattrString(prefixes.OwnerIDAttr) switch { case err == nil: owner.OpaqueId = attr @@ -473,7 +474,7 @@ func (n *Node) readOwner() (*userpb.UserId, error) { } // lookup IDP in extended attributes - attr, err = n.SpaceRoot.Xattr(prefixes.OwnerIDPAttr) + attr, err = n.SpaceRoot.XattrString(prefixes.OwnerIDPAttr) switch { case err == nil: owner.Idp = attr @@ -484,7 +485,7 @@ func (n *Node) readOwner() (*userpb.UserId, error) { } // lookup type in extended attributes - attr, err = n.SpaceRoot.Xattr(prefixes.OwnerTypeAttr) + attr, err = n.SpaceRoot.XattrString(prefixes.OwnerTypeAttr) switch { case err == nil: owner.Type = utils.UserTypeMap(attr) @@ -596,7 +597,7 @@ func (n *Node) SetEtag(ctx context.Context, val string) (err error) { return nil } // etag is only valid until the calculated etag changes, is part of propagation - return n.SetXattr(prefixes.TmpEtagAttr, val) + return n.SetXattrString(prefixes.TmpEtagAttr, val) } // SetFavorite sets the favorite for the current user @@ -619,13 +620,13 @@ func (n *Node) SetEtag(ctx context.Context, val string) (err error) { func (n *Node) SetFavorite(uid *userpb.UserId, val string) error { // the favorite flag is specific to the user, so we need to incorporate the userid fa := fmt.Sprintf("%s:%s:%s@%s", prefixes.FavPrefix, utils.UserTypeToString(uid.GetType()), uid.GetOpaqueId(), uid.GetIdp()) - return n.SetXattr(fa, val) + return n.SetXattrString(fa, val) } // IsDir returns true if the node is a directory func (n *Node) IsDir() bool { - attr, _ := n.Xattr(prefixes.TypeAttr) - return attr == strconv.FormatInt(int64(provider.ResourceType_RESOURCE_TYPE_CONTAINER), 10) + attr, _ := n.XattrInt32(prefixes.TypeAttr) + return attr == int32(provider.ResourceType_RESOURCE_TYPE_CONTAINER) } // AsResourceInfo return the node as CS3 ResourceInfo @@ -637,7 +638,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi var target string if nodeType == provider.ResourceType_RESOURCE_TYPE_REFERENCE { - target, _ = n.Xattr(prefixes.ReferenceAttr) + target, _ = n.XattrString(prefixes.ReferenceAttr) } id := &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID} @@ -697,8 +698,8 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi } // use temporary etag if it is set - if b, err := n.Xattr(prefixes.TmpEtagAttr); err == nil && b != "" { - ri.Etag = fmt.Sprintf(`"%x"`, b) // TODO why do we convert string(b)? is the temporary etag stored as string? -> should we use bytes? use hex.EncodeToString? + if b, err := n.XattrString(prefixes.TmpEtagAttr); err == nil && b != "" { + ri.Etag = fmt.Sprintf(`"%x"`, b) } else if ri.Etag, err = calculateEtag(n.ID, tmTime); err != nil { sublog.Debug().Err(err).Msg("could not calculate etag") } @@ -740,7 +741,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi // the favorite flag is specific to the user, so we need to incorporate the userid if uid := u.GetId(); uid != nil { fa := fmt.Sprintf("%s:%s:%s@%s", prefixes.FavPrefix, utils.UserTypeToString(uid.GetType()), uid.GetOpaqueId(), uid.GetIdp()) - if val, err := n.Xattr(fa); err == nil { + if val, err := n.XattrString(fa); err == nil { sublog.Debug(). Str("favorite", fa). Msg("found favorite flag") @@ -814,7 +815,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi // only read when key was requested k := key[len(prefixes.MetadataPrefix):] if _, ok := mdKeysMap[k]; returnAllMetadata || ok { - metadata[k] = value + metadata[k] = string(value) } } @@ -841,7 +842,7 @@ func (n *Node) readChecksumIntoResourceChecksum(ctx context.Context, algo string case err == nil: ri.Checksum = &provider.ResourceChecksum{ Type: storageprovider.PKG2GRPCXS(algo), - Sum: hex.EncodeToString([]byte(v)), + Sum: hex.EncodeToString(v), } case metadata.IsAttrUnset(err): appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("checksum not set") @@ -861,7 +862,7 @@ func (n *Node) readChecksumIntoOpaque(ctx context.Context, algo string, ri *prov } ri.Opaque.Map[algo] = &types.OpaqueEntry{ Decoder: "plain", - Value: []byte(hex.EncodeToString([]byte(v))), + Value: []byte(hex.EncodeToString(v)), } case metadata.IsAttrUnset(err): appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("checksum not set") @@ -872,7 +873,7 @@ func (n *Node) readChecksumIntoOpaque(ctx context.Context, algo string, ri *prov // quota is always stored on the root node func (n *Node) readQuotaIntoOpaque(ctx context.Context, ri *provider.ResourceInfo) { - v, err := n.Xattr(prefixes.QuotaAttr) + v, err := n.XattrString(prefixes.QuotaAttr) switch { case err == nil: // make sure we have a proper signed int @@ -902,20 +903,25 @@ func (n *Node) readQuotaIntoOpaque(ctx context.Context, ri *provider.ResourceInf // HasPropagation checks if the propagation attribute exists and is set to "1" func (n *Node) HasPropagation() (propagation bool) { - if b, err := n.Xattr(prefixes.PropagationAttr); err == nil { + if b, err := n.XattrString(prefixes.PropagationAttr); err == nil { return b == "1" } return false } -// GetTMTime reads the tmtime from the extended attributes +// GetTMTime reads the tmtime from the extended attributes, falling back to GetMTime() func (n *Node) GetTMTime() (time.Time, error) { - b, err := n.Xattr(prefixes.TreeMTimeAttr) + b, err := n.XattrString(prefixes.TreeMTimeAttr) if err == nil { return time.Parse(time.RFC3339Nano, b) } // no tmtime, use mtime + return n.GetMTime() +} + +// GetMTime reads the mtime from disk +func (n *Node) GetMTime() (time.Time, error) { fi, err := os.Lstat(n.InternalPath()) if err != nil { return time.Time{}, err @@ -928,12 +934,12 @@ func (n *Node) SetTMTime(t *time.Time) (err error) { if t == nil { return n.RemoveXattr(prefixes.TreeMTimeAttr) } - return n.SetXattr(prefixes.TreeMTimeAttr, t.UTC().Format(time.RFC3339Nano)) + return n.SetXattrString(prefixes.TreeMTimeAttr, t.UTC().Format(time.RFC3339Nano)) } // GetDTime reads the dtime from the extended attributes func (n *Node) GetDTime() (tmTime time.Time, err error) { - b, err := n.Xattr(prefixes.DTimeAttr) + b, err := n.XattrString(prefixes.DTimeAttr) if err != nil { return time.Time{}, err } @@ -945,7 +951,7 @@ func (n *Node) SetDTime(t *time.Time) (err error) { if t == nil { return n.RemoveXattr(prefixes.DTimeAttr) } - return n.SetXattr(prefixes.DTimeAttr, t.UTC().Format(time.RFC3339Nano)) + return n.SetXattrString(prefixes.DTimeAttr, t.UTC().Format(time.RFC3339Nano)) } // IsDisabled returns true when the node has a dmtime attribute set @@ -960,30 +966,30 @@ func (n *Node) IsDisabled() bool { // GetTreeSize reads the treesize from the extended attributes func (n *Node) GetTreeSize() (treesize uint64, err error) { - var b string - if b, err = n.Xattr(prefixes.TreesizeAttr); err != nil { - return + s, err := n.XattrInt64(prefixes.TreesizeAttr) + if err != nil { + return 0, err } - return strconv.ParseUint(b, 10, 64) + return uint64(s), nil } // SetTreeSize writes the treesize to the extended attributes func (n *Node) SetTreeSize(ts uint64) (err error) { - return n.SetXattr(prefixes.TreesizeAttr, strconv.FormatUint(ts, 10)) + return n.SetXattrString(prefixes.TreesizeAttr, strconv.FormatUint(ts, 10)) } // GetBlobSize reads the blobsize from the extended attributes func (n *Node) GetBlobSize() (treesize uint64, err error) { - var b string - if b, err = n.Xattr(prefixes.BlobsizeAttr); err != nil { - return + s, err := n.XattrInt64(prefixes.BlobsizeAttr) + if err != nil { + return 0, err } - return strconv.ParseUint(b, 10, 64) + return uint64(s), nil } // SetChecksum writes the checksum with the given checksum type to the extended attributes func (n *Node) SetChecksum(csType string, h hash.Hash) (err error) { - return n.SetXattr(prefixes.ChecksumPrefix+csType, string(h.Sum(nil))) + return n.SetXattr(prefixes.ChecksumPrefix+csType, h.Sum(nil)) } // UnsetTempEtag removes the temporary etag attribute @@ -1105,7 +1111,7 @@ func (n *Node) ReadGrant(ctx context.Context, grantee string) (g *provider.Grant return nil, err } var e *ace.ACE - if e, err = ace.Unmarshal(strings.TrimPrefix(grantee, prefixes.GrantPrefix), []byte(xattr)); err != nil { + if e, err = ace.Unmarshal(strings.TrimPrefix(grantee, prefixes.GrantPrefix), xattr); err != nil { return nil, err } return e.Grant(), nil @@ -1193,7 +1199,7 @@ func (n *Node) FindStorageSpaceRoot() error { // UnmarkProcessing removes the processing flag from the node func (n *Node) UnmarkProcessing(uploadID string) error { - v, _ := n.Xattr(prefixes.StatusPrefix) + v, _ := n.XattrString(prefixes.StatusPrefix) if v != ProcessingStatus+uploadID { // file started another postprocessing later - do not remove return nil @@ -1203,7 +1209,7 @@ func (n *Node) UnmarkProcessing(uploadID string) error { // IsProcessing returns true if the node is currently being processed func (n *Node) IsProcessing() bool { - v, err := n.Xattr(prefixes.StatusPrefix) + v, err := n.XattrString(prefixes.StatusPrefix) return err == nil && strings.HasPrefix(v, ProcessingStatus) } @@ -1215,15 +1221,15 @@ func (n *Node) IsSpaceRoot() bool { // SetScanData sets the virus scan info to the node func (n *Node) SetScanData(info string, date time.Time) error { - return n.SetXattrs(map[string]string{ - prefixes.ScanStatusPrefix: info, - prefixes.ScanDatePrefix: date.Format(time.RFC3339Nano), - }, true) + attribs := Attributes{} + attribs.SetString(prefixes.ScanStatusPrefix, info) + attribs.SetString(prefixes.ScanDatePrefix, date.Format(time.RFC3339Nano)) + return n.SetXattrs(attribs, true) } // ScanData returns scanning information of the node func (n *Node) ScanData() (scanned bool, virus string, scantime time.Time) { - ti, _ := n.Xattr(prefixes.ScanDatePrefix) + ti, _ := n.XattrString(prefixes.ScanDatePrefix) if ti == "" { return // not scanned yet } @@ -1233,7 +1239,7 @@ func (n *Node) ScanData() (scanned bool, virus string, scantime time.Time) { return } - i, err := n.Xattr(prefixes.ScanStatusPrefix) + i, err := n.XattrString(prefixes.ScanStatusPrefix) if err != nil { return } @@ -1251,10 +1257,19 @@ var CheckQuota = func(spaceRoot *Node, overwrite bool, oldSize, newSize uint64) if !enoughDiskSpace(spaceRoot.InternalPath(), newSize) { return false, errtypes.InsufficientStorage("disk full") } - quotaByteStr, _ := spaceRoot.Xattr(prefixes.QuotaAttr) - if quotaByteStr == "" || quotaByteStr == QuotaUnlimited { + quotaByteStr, _ := spaceRoot.XattrString(prefixes.QuotaAttr) + switch quotaByteStr { + case "": // if quota is not set, it means unlimited return true, nil + case QuotaUnlimited: + return true, nil + case QuotaUncalculated: + // treat it as unlimited + return true, nil + case QuotaUnknown: + // treat it as unlimited + return true, nil } quotaByte, _ := strconv.ParseUint(quotaByteStr, 10, 64) if overwrite { diff --git a/pkg/storage/utils/decomposedfs/node/node_test.go b/pkg/storage/utils/decomposedfs/node/node_test.go index bd3771bc2d..6d3c99bbd6 100644 --- a/pkg/storage/utils/decomposedfs/node/node_test.go +++ b/pkg/storage/utils/decomposedfs/node/node_test.go @@ -94,7 +94,7 @@ var _ = Describe("Node", func() { n.BlobID = "TestBlobID" n.Blobsize = blobsize - err = n.WriteAllNodeMetadata() + err = n.WriteAllNodeMetadata(env.Ctx) Expect(err).ToNot(HaveOccurred()) n2, err := env.Lookup.NodeFromResource(env.Ctx, ref) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/storage/utils/decomposedfs/node/xattrs.go b/pkg/storage/utils/decomposedfs/node/xattrs.go index f7cb51a964..506d3d047a 100644 --- a/pkg/storage/utils/decomposedfs/node/xattrs.go +++ b/pkg/storage/utils/decomposedfs/node/xattrs.go @@ -19,11 +19,36 @@ package node import ( + "strconv" + "github.com/pkg/xattr" ) +// Attributes is a map of string keys and byte array values +type Attributes map[string][]byte + +// String reads a String value +func (md Attributes) String(key string) string { + return string(md[key]) +} + +// SetString sets a string value +func (md Attributes) SetString(key, val string) { + md[key] = []byte(val) +} + +// Int64 reads an int64 value +func (md Attributes) Int64(key string) (int64, error) { + return strconv.ParseInt(string(md[key]), 10, 64) +} + +// SetInt64 sets an int64 value +func (md Attributes) SetInt64(key string, val int64) { + md[key] = []byte(strconv.FormatInt(val, 10)) +} + // SetXattrs sets multiple extended attributes on the write-through cache/node -func (n *Node) SetXattrs(attribs map[string]string, acquireLock bool) (err error) { +func (n *Node) SetXattrs(attribs map[string][]byte, acquireLock bool) (err error) { if n.xattrsCache != nil { for k, v := range attribs { n.xattrsCache[k] = v @@ -34,7 +59,7 @@ func (n *Node) SetXattrs(attribs map[string]string, acquireLock bool) (err error } // SetXattr sets an extended attribute on the write-through cache/node -func (n *Node) SetXattr(key, val string) (err error) { +func (n *Node) SetXattr(key string, val []byte) (err error) { if n.xattrsCache != nil { n.xattrsCache[key] = val } @@ -42,6 +67,15 @@ func (n *Node) SetXattr(key, val string) (err error) { return n.lu.MetadataBackend().Set(n.InternalPath(), key, val) } +// SetXattrString sets a string extended attribute on the write-through cache/node +func (n *Node) SetXattrString(key, val string) (err error) { + if n.xattrsCache != nil { + n.xattrsCache[key] = []byte(val) + } + + return n.lu.MetadataBackend().Set(n.InternalPath(), key, []byte(val)) +} + // RemoveXattr removes an extended attribute from the write-through cache/node func (n *Node) RemoveXattr(key string) error { if n.xattrsCache != nil { @@ -52,7 +86,7 @@ func (n *Node) RemoveXattr(key string) error { // Xattrs returns the extended attributes of the node. If the attributes have already // been cached they are not read from disk again. -func (n *Node) Xattrs() (map[string]string, error) { +func (n *Node) Xattrs() (Attributes, error) { if n.xattrsCache != nil { return n.xattrsCache, nil } @@ -67,11 +101,11 @@ func (n *Node) Xattrs() (map[string]string, error) { // Xattr returns an extended attribute of the node. If the attributes have already // been cached it is not read from disk again. -func (n *Node) Xattr(key string) (string, error) { +func (n *Node) Xattr(key string) ([]byte, error) { if n.xattrsCache == nil { attrs, err := n.lu.MetadataBackend().All(n.InternalPath()) if err != nil { - return "", err + return []byte{}, err } n.xattrsCache = attrs } @@ -80,5 +114,37 @@ func (n *Node) Xattr(key string) (string, error) { return val, nil } // wrap the error as xattr does - return "", &xattr.Error{Op: "xattr.get", Path: n.InternalPath(), Name: key, Err: xattr.ENOATTR} + return []byte{}, &xattr.Error{Op: "xattr.get", Path: n.InternalPath(), Name: key, Err: xattr.ENOATTR} +} + +// XattrString returns the string representation of an attribute +func (n *Node) XattrString(key string) (string, error) { + b, err := n.Xattr(key) + if err != nil { + return "", err + } + return string(b), nil +} + +// XattrInt32 returns the int32 representation of an attribute +func (n *Node) XattrInt32(key string) (int32, error) { + b, err := n.XattrString(key) + if err != nil { + return 0, err + } + + typeInt, err := strconv.ParseInt(b, 10, 32) + if err != nil { + return 0, err + } + return int32(typeInt), nil +} + +// XattrInt64 returns the int64 representation of an attribute +func (n *Node) XattrInt64(key string) (int64, error) { + b, err := n.XattrString(key) + if err != nil { + return 0, err + } + return strconv.ParseInt(b, 10, 64) } diff --git a/pkg/storage/utils/decomposedfs/recycle.go b/pkg/storage/utils/decomposedfs/recycle.go index 398a78bc4a..18598ef980 100644 --- a/pkg/storage/utils/decomposedfs/recycle.go +++ b/pkg/storage/utils/decomposedfs/recycle.go @@ -94,7 +94,7 @@ func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference } // lookup origin path in extended attributes if attrBytes, ok := attrs[prefixes.TrashOriginAttr]; ok { - origin = attrBytes + origin = string(attrBytes) } else { sublog.Error().Err(err).Str("space", spaceID).Msg("could not read origin path, skipping") return nil, err @@ -114,7 +114,7 @@ func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference nodeType := fs.lu.TypeFromPath(originalPath) if nodeType != provider.ResourceType_RESOURCE_TYPE_CONTAINER { // this is the case when we want to directly list a file in the trashbin - blobsize, err := strconv.ParseInt(attrs[prefixes.BlobsizeAttr], 10, 64) + blobsize, err := strconv.ParseInt(string(attrs[prefixes.BlobsizeAttr]), 10, 64) if err != nil { return items, err } @@ -168,7 +168,7 @@ func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference sublog.Error().Err(err).Str("name", name).Msg("invalid tree size, skipping") continue } - size, err = strconv.ParseInt(attr, 10, 64) + size, err = strconv.ParseInt(string(attr), 10, 64) if err != nil { sublog.Error().Err(err).Str("name", name).Msg("invalid tree size, skipping") continue @@ -263,7 +263,7 @@ func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*p // lookup origin path in extended attributes if attr, ok := attrs[prefixes.TrashOriginAttr]; ok { - item.Ref = &provider.Reference{Path: attr} + item.Ref = &provider.Reference{Path: string(attr)} } else { log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path, skipping") continue diff --git a/pkg/storage/utils/decomposedfs/spaces.go b/pkg/storage/utils/decomposedfs/spaces.go index d2445bdb82..015a383347 100644 --- a/pkg/storage/utils/decomposedfs/spaces.go +++ b/pkg/storage/utils/decomposedfs/spaces.go @@ -25,7 +25,6 @@ import ( "math" "os" "path/filepath" - "strconv" "strings" "time" @@ -102,7 +101,7 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr return nil, errors.Wrap(err, "Decomposedfs: error creating node") } - if err := root.WriteAllNodeMetadata(); err != nil { + if err := root.WriteAllNodeMetadata(ctx); err != nil { return nil, err } var owner *userv1beta1.UserId @@ -120,16 +119,16 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr return nil, err } - metadata := make(map[string]string, 6) + metadata := make(node.Attributes, 6) // always enable propagation on the storage space root // mark the space root node as the end of propagation - metadata[prefixes.PropagationAttr] = "1" - metadata[prefixes.NameAttr] = req.Name - metadata[prefixes.SpaceNameAttr] = req.Name + metadata.SetString(prefixes.PropagationAttr, "1") + metadata.SetString(prefixes.NameAttr, req.Name) + metadata.SetString(prefixes.SpaceNameAttr, req.Name) if req.Type != "" { - metadata[prefixes.SpaceTypeAttr] = req.Type + metadata.SetString(prefixes.SpaceTypeAttr, req.Type) } if q := req.GetQuota(); q != nil { @@ -137,19 +136,19 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr if fs.o.MaxQuota != quotaUnrestricted && q.GetQuotaMaxBytes() > fs.o.MaxQuota { return nil, errtypes.BadRequest("decompsedFS: requested quota is higher than allowed") } - metadata[prefixes.QuotaAttr] = strconv.FormatUint(q.QuotaMaxBytes, 10) + metadata.SetInt64(prefixes.QuotaAttr, int64(q.QuotaMaxBytes)) } else if fs.o.MaxQuota != quotaUnrestricted { // If no quota was requested but a max quota was set then the the storage space has a quota // of max quota. - metadata[prefixes.QuotaAttr] = strconv.FormatUint(fs.o.MaxQuota, 10) + metadata.SetInt64(prefixes.QuotaAttr, int64(fs.o.MaxQuota)) } if description != "" { - metadata[prefixes.SpaceDescriptionAttr] = description + metadata.SetString(prefixes.SpaceDescriptionAttr, description) } if alias != "" { - metadata[prefixes.SpaceAliasAttr] = alias + metadata.SetString(prefixes.SpaceAliasAttr, alias) } if err := root.SetXattrs(metadata, true); err != nil { @@ -341,10 +340,15 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide for match := range matches { var err error + // TODO introduce metadata.IsLockFile(path) // do not investigate flock files any further. They indicate file locks but are not relevant here. if strings.HasSuffix(match, filelocks.LockFileSuffix) { continue } + // skip metadata files + if fs.lu.MetadataBackend().IsMetaFile(match) { + continue + } // always read link in case storage space id != node id spaceID, nodeID, err = ReadSpaceAndNodeFromIndexLink(match) if err != nil { @@ -433,10 +437,10 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up space := req.StorageSpace _, spaceID, _, _ := storagespace.SplitID(space.Id.OpaqueId) - metadata := make(map[string]string, 5) + metadata := make(node.Attributes, 5) if space.Name != "" { - metadata[prefixes.NameAttr] = space.Name - metadata[prefixes.SpaceNameAttr] = space.Name + metadata.SetString(prefixes.NameAttr, space.Name) + metadata.SetString(prefixes.SpaceNameAttr, space.Name) } if space.Quota != nil { @@ -448,16 +452,16 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up // If the caller wants to unrestrict the space we give it the maximum allowed quota. space.Quota.QuotaMaxBytes = fs.o.MaxQuota } - metadata[prefixes.QuotaAttr] = strconv.FormatUint(space.Quota.QuotaMaxBytes, 10) + metadata.SetInt64(prefixes.QuotaAttr, int64(space.Quota.QuotaMaxBytes)) } // TODO also return values which are not in the request if space.Opaque != nil { if description, ok := space.Opaque.Map["description"]; ok { - metadata[prefixes.SpaceDescriptionAttr] = string(description.Value) + metadata[prefixes.SpaceDescriptionAttr] = description.Value } if alias := utils.ReadPlainFromOpaque(space.Opaque, "spaceAlias"); alias != "" { - metadata[prefixes.SpaceAliasAttr] = alias + metadata.SetString(prefixes.SpaceAliasAttr, alias) } if image := utils.ReadPlainFromOpaque(space.Opaque, "image"); image != "" { imageID, err := storagespace.ParseID(image) @@ -466,7 +470,7 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up Status: &v1beta11.Status{Code: v1beta11.Code_CODE_NOT_FOUND, Message: "decomposedFS: space image resource not found"}, }, nil } - metadata[prefixes.SpaceImageAttr] = imageID.OpaqueId + metadata.SetString(prefixes.SpaceImageAttr, imageID.OpaqueId) } if readme := utils.ReadPlainFromOpaque(space.Opaque, "readme"); readme != "" { readmeID, err := storagespace.ParseID(readme) @@ -475,7 +479,7 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up Status: &v1beta11.Status{Code: v1beta11.Code_CODE_NOT_FOUND, Message: "decomposedFS: space readme resource not found"}, }, nil } - metadata[prefixes.SpaceReadmeAttr] = readmeID.OpaqueId + metadata.SetString(prefixes.SpaceReadmeAttr, readmeID.OpaqueId) } } @@ -574,7 +578,7 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De return err } - st, err := n.SpaceRoot.Xattr(prefixes.SpaceTypeAttr) + st, err := n.SpaceRoot.XattrString(prefixes.SpaceTypeAttr) if err != nil { return errtypes.InternalError(fmt.Sprintf("space %s does not have a spacetype, possible corrupt decompsedfs", n.ID)) } @@ -604,7 +608,7 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De return errtypes.NewErrtypeFromStatus(status.NewInvalid(ctx, "can't purge enabled space")) } - spaceType, err := n.Xattr(prefixes.SpaceTypeAttr) + spaceType, err := n.XattrString(prefixes.SpaceTypeAttr) if err != nil { return err } @@ -710,7 +714,7 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, var err error // TODO apply more filters var sname string - if sname, err = n.SpaceRoot.Xattr(prefixes.SpaceNameAttr); err != nil { + if sname, err = n.SpaceRoot.XattrString(prefixes.SpaceNameAttr); err != nil { // FIXME: Is that a severe problem? appctx.GetLogger(ctx).Debug().Err(err).Msg("space does not have a name attribute") } @@ -815,7 +819,8 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, // Mtime is set either as node.tmtime or as fi.mtime below } - if space.SpaceType, err = n.SpaceRoot.Xattr(prefixes.SpaceTypeAttr); err != nil { + space.SpaceType, err = n.SpaceRoot.XattrString(prefixes.SpaceTypeAttr) + if err != nil { appctx.GetLogger(ctx).Debug().Err(err).Msg("space does not have a type attribute") } @@ -863,42 +868,33 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, return nil, err } - // quota - quotaAttr, ok := spaceAttributes[prefixes.QuotaAttr] - if ok { + // if quota is set try parsing it as int64, otherwise don't bother + if q, err := spaceAttributes.Int64(prefixes.QuotaAttr); err == nil && q >= 0 { // make sure we have a proper signed int // we use the same magic numbers to indicate: // -1 = uncalculated // -2 = unknown // -3 = unlimited - if quota, err := strconv.ParseUint(quotaAttr, 10, 64); err == nil { - space.Quota = &provider.Quota{ - QuotaMaxBytes: quota, - QuotaMaxFiles: math.MaxUint64, // TODO MaxUInt64? = unlimited? why even max files? 0 = unlimited? - } - } else { - return nil, err + space.Quota = &provider.Quota{ + QuotaMaxBytes: uint64(q), + QuotaMaxFiles: math.MaxUint64, // TODO MaxUInt64? = unlimited? why even max files? 0 = unlimited? } } - spaceImage, ok := spaceAttributes[prefixes.SpaceImageAttr] - if ok { + if si := spaceAttributes.String(prefixes.SpaceImageAttr); si != "" { space.Opaque = utils.AppendPlainToOpaque(space.Opaque, "image", storagespace.FormatResourceID( - provider.ResourceId{StorageId: space.Root.StorageId, SpaceId: space.Root.SpaceId, OpaqueId: spaceImage}, + provider.ResourceId{StorageId: space.Root.StorageId, SpaceId: space.Root.SpaceId, OpaqueId: si}, )) } - spaceDescription, ok := spaceAttributes[prefixes.SpaceDescriptionAttr] - if ok { - space.Opaque = utils.AppendPlainToOpaque(space.Opaque, "description", spaceDescription) + if sd := spaceAttributes.String(prefixes.SpaceDescriptionAttr); sd != "" { + space.Opaque = utils.AppendPlainToOpaque(space.Opaque, "description", sd) } - spaceReadme, ok := spaceAttributes[prefixes.SpaceReadmeAttr] - if ok { + if sr := spaceAttributes.String(prefixes.SpaceReadmeAttr); sr != "" { space.Opaque = utils.AppendPlainToOpaque(space.Opaque, "readme", storagespace.FormatResourceID( - provider.ResourceId{StorageId: space.Root.StorageId, SpaceId: space.Root.SpaceId, OpaqueId: spaceReadme}, + provider.ResourceId{StorageId: space.Root.StorageId, SpaceId: space.Root.SpaceId, OpaqueId: sr}, )) } - spaceAlias, ok := spaceAttributes[prefixes.SpaceAliasAttr] - if ok { - space.Opaque = utils.AppendPlainToOpaque(space.Opaque, "spaceAlias", spaceAlias) + if sa := spaceAttributes.String(prefixes.SpaceAliasAttr); sa != "" { + space.Opaque = utils.AppendPlainToOpaque(space.Opaque, "spaceAlias", sa) } // add rootinfo @@ -907,7 +903,7 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, return space, nil } -func mapHasKey(checkMap map[string]string, keys ...string) bool { +func mapHasKey(checkMap map[string][]byte, keys ...string) bool { for _, key := range keys { if _, hasKey := checkMap[key]; hasKey { return true diff --git a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go index 3e2bb412f8..26a6d61f48 100644 --- a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go +++ b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go @@ -219,7 +219,7 @@ func (t *TestEnv) CreateTestFile(name, blobID, parentID, spaceID string, blobSiz if err != nil { return nil, err } - err = n.WriteAllNodeMetadata() + err = n.WriteAllNodeMetadata(context.Background()) if err != nil { return nil, err } @@ -277,7 +277,7 @@ func (t *TestEnv) CreateTestStorageSpace(typ string, quota *providerv1beta1.Quot if err != nil { return nil, err } - if err = h.SetXattr(prefixes.SpaceNameAttr, "username"); err != nil { + if err = h.SetXattr(prefixes.SpaceNameAttr, []byte("username")); err != nil { return nil, err } diff --git a/pkg/storage/utils/decomposedfs/tree/migrations.go b/pkg/storage/utils/decomposedfs/tree/migrations.go index eb5032d07c..61235ba113 100644 --- a/pkg/storage/utils/decomposedfs/tree/migrations.go +++ b/pkg/storage/utils/decomposedfs/tree/migrations.go @@ -59,7 +59,7 @@ func (t *Tree) migration0001Nodes() error { nodePath := filepath.Join(nodesPath, n.Name()) attr, err := t.lookup.MetadataBackend().Get(nodePath, prefixes.ParentidAttr) - if err == nil && attr == node.RootID { + if err == nil && string(attr) == node.RootID { if err := t.moveNode(n.Name(), n.Name()); err != nil { logger.New().Error().Err(err). Str("space", n.Name()). diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index 48c6a79f78..8b2aac59d4 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -44,6 +44,7 @@ import ( "github.com/cs3org/reva/v2/pkg/utils" "github.com/google/uuid" "github.com/pkg/errors" + "github.com/rogpeppe/go-internal/lockedfile" "github.com/rs/zerolog/log" ) @@ -264,7 +265,7 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node) error { return errors.Wrap(err, "Decomposedfs: error creating node") } - err = n.WriteAllNodeMetadata() + err = n.WriteAllNodeMetadata(ctx) if err != nil { return err } @@ -300,7 +301,7 @@ func (t *Tree) CreateDir(ctx context.Context, n *node.Node) (err error) { n.ID = uuid.New().String() } - err = t.createDirNode(n) + err = t.createDirNode(ctx, n) if err != nil { return } @@ -366,7 +367,7 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) } // update name attribute - if err := oldNode.SetXattr(prefixes.NameAttr, newNode.Name); err != nil { + if err := oldNode.SetXattrString(prefixes.NameAttr, newNode.Name); err != nil { return errors.Wrap(err, "Decomposedfs: could not set name attribute") } @@ -386,11 +387,11 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) } // update target parentid and name - if err := oldNode.SetXattr(prefixes.ParentidAttr, newNode.ParentID); err != nil { - return errors.Wrap(err, "Decomposedfs: could not set parentid attribute") - } - if err := oldNode.SetXattr(prefixes.NameAttr, newNode.Name); err != nil { - return errors.Wrap(err, "Decomposedfs: could not set name attribute") + attribs := node.Attributes{} + attribs.SetString(prefixes.ParentidAttr, newNode.ParentID) + attribs.SetString(prefixes.NameAttr, newNode.Name) + if err := oldNode.SetXattrs(attribs, true); err != nil { + return errors.Wrap(err, "Decomposedfs: could not update old node attributes") } // the size diff is the current treesize or blobsize of the old/source node @@ -487,7 +488,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { // set origin location in metadata nodePath := n.InternalPath() - if err := n.SetXattr(prefixes.TrashOriginAttr, origin); err != nil { + if err := n.SetXattrString(prefixes.TrashOriginAttr, origin); err != nil { return err } @@ -616,16 +617,16 @@ func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPa } targetNode.Exists = true - // update name attribute - if err := recycleNode.SetXattr(prefixes.NameAttr, targetNode.Name); err != nil { - return errors.Wrap(err, "Decomposedfs: could not set name attribute") - } - // set ParentidAttr to restorePath's node parent id + attrs := node.Attributes{} + attrs.SetString(prefixes.NameAttr, targetNode.Name) if trashPath != "" { - if err := recycleNode.SetXattr(prefixes.ParentidAttr, targetNode.ParentID); err != nil { - return errors.Wrap(err, "Decomposedfs: could not set name attribute") - } + // set ParentidAttr to restorePath's node parent id + attrs.SetString(prefixes.ParentidAttr, targetNode.ParentID) + } + + if err = recycleNode.SetXattrs(attrs, true); err != nil { + return errors.Wrap(err, "Decomposedfs: could not update recycle node") } // delete item link in trash @@ -775,21 +776,29 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err } if t.treeTimeAccounting || (t.treeSizeAccounting && sizeDiff != 0) { - attrs := map[string]string{} + attrs := node.Attributes{} + var f *lockedfile.File // lock node before reading treesize or tree time - nodeLock, err := filelocks.AcquireWriteLock(n.InternalPath()) + switch t.lookup.MetadataBackend().(type) { + case metadata.MessagePackBackend: + f, err = lockedfile.OpenFile(t.lookup.MetadataBackend().MetadataPath(n.InternalPath()), os.O_RDWR|os.O_CREATE, 0600) + case metadata.XattrsBackend: + // we have to use dedicated lockfiles to lock directories + // this only works because the xattr backend also locks folders with separate lock files + f, err = lockedfile.OpenFile(n.InternalPath()+filelocks.LockFileSuffix, os.O_RDWR|os.O_CREATE, 0600) + } if err != nil { return err } - // always unlock node - releaseLock := func() { - // ReleaseLock returns nil if already unlocked - if err := filelocks.ReleaseLock(nodeLock); err != nil { - sublog.Err(err).Msg("Decomposedfs: could not unlock parent node") + // always log error if closing node fails + defer func() { + // ignore already closed error + cerr := f.Close() + if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) { + err = cerr // only overwrite err with en error from close if the former was nil } - } - defer releaseLock() + }() if t.treeTimeAccounting { // update the parent tree time if it is older than the nodes mtime @@ -819,10 +828,10 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err if updateSyncTime { // update the tree time of the parent node - attrs[prefixes.TreeMTimeAttr] = sTime.UTC().Format(time.RFC3339Nano) + attrs.SetString(prefixes.TreeMTimeAttr, sTime.UTC().Format(time.RFC3339Nano)) } - attrs[prefixes.TmpEtagAttr] = "" + attrs.SetString(prefixes.TmpEtagAttr, "") } // size accounting @@ -849,7 +858,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err } // update the tree size of the node - attrs[prefixes.TreesizeAttr] = strconv.FormatUint(newSize, 10) + attrs.SetString(prefixes.TreesizeAttr, strconv.FormatUint(newSize, 10)) sublog.Debug().Uint64("newSize", newSize).Msg("updated treesize of parent node") } @@ -857,10 +866,10 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err return err } - // Release node lock early, returns nil if already unlocked - err = filelocks.ReleaseLock(nodeLock) - if err != nil { - return errtypes.InternalError(err.Error()) + // Release node lock early, ignore already closed error + cerr := f.Close() + if cerr != nil && !errors.Is(cerr, os.ErrClosed) { + return cerr } } } @@ -901,10 +910,10 @@ func (t *Tree) calculateTreeSize(ctx context.Context, childrenPath string) (uint continue // continue after an error } sizeAttr := "" - if attribs[prefixes.TypeAttr] == strconv.FormatUint(uint64(provider.ResourceType_RESOURCE_TYPE_FILE), 10) { - sizeAttr = attribs[prefixes.BlobsizeAttr] + if string(attribs[prefixes.TypeAttr]) == strconv.FormatUint(uint64(provider.ResourceType_RESOURCE_TYPE_FILE), 10) { + sizeAttr = string(attribs[prefixes.BlobsizeAttr]) } else { - sizeAttr = attribs[prefixes.TreesizeAttr] + sizeAttr = string(attribs[prefixes.TreesizeAttr]) } csize, err := strconv.ParseInt(sizeAttr, 10, 64) if err != nil { @@ -942,14 +951,14 @@ func (t *Tree) DeleteBlob(node *node.Node) error { } // TODO check if node exists? -func (t *Tree) createDirNode(n *node.Node) (err error) { +func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) { // create a directory node nodePath := n.InternalPath() if err := os.MkdirAll(nodePath, 0700); err != nil { return errors.Wrap(err, "Decomposedfs: error creating node") } - return n.WriteAllNodeMetadata() + return n.WriteAllNodeMetadata(ctx) } var nodeIDRegep = regexp.MustCompile(`.*/nodes/([^.]*).*`) @@ -982,10 +991,10 @@ func (t *Tree) readRecycleItem(ctx context.Context, spaceID, key, path string) ( } recycleNode.SetType(t.lookup.TypeFromPath(recycleNode.InternalPath())) - var attrStr string + var attrBytes []byte // lookup blobID in extended attributes - if attrStr, err = backend.Get(deletedNodePath, prefixes.BlobIDAttr); err == nil { - recycleNode.BlobID = attrStr + if attrBytes, err = backend.Get(deletedNodePath, prefixes.BlobIDAttr); err == nil { + recycleNode.BlobID = string(attrBytes) } else { return } @@ -996,15 +1005,15 @@ func (t *Tree) readRecycleItem(ctx context.Context, spaceID, key, path string) ( } // lookup parent id in extended attributes - if attrStr, err = backend.Get(deletedNodePath, prefixes.ParentidAttr); err == nil { - recycleNode.ParentID = attrStr + if attrBytes, err = backend.Get(deletedNodePath, prefixes.ParentidAttr); err == nil { + recycleNode.ParentID = string(attrBytes) } else { return } // lookup name in extended attributes - if attrStr, err = backend.Get(deletedNodePath, prefixes.NameAttr); err == nil { - recycleNode.Name = attrStr + if attrBytes, err = backend.Get(deletedNodePath, prefixes.NameAttr); err == nil { + recycleNode.Name = string(attrBytes) } else { return } @@ -1013,8 +1022,8 @@ func (t *Tree) readRecycleItem(ctx context.Context, spaceID, key, path string) ( origin = "/" // lookup origin path in extended attributes - if attrStr, err = backend.Get(resolvedTrashItem, prefixes.TrashOriginAttr); err == nil { - origin = filepath.Join(attrStr, path) + if attrBytes, err = backend.Get(resolvedTrashItem, prefixes.TrashOriginAttr); err == nil { + origin = filepath.Join(string(attrBytes), path) } else { log.Error().Err(err).Str("trashItem", trashItem).Str("deletedNodePath", deletedNodePath).Msg("could not read origin path, restoring to /") } diff --git a/pkg/storage/utils/decomposedfs/tree/tree_test.go b/pkg/storage/utils/decomposedfs/tree/tree_test.go index c1ad5b1931..67e3092797 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree_test.go +++ b/pkg/storage/utils/decomposedfs/tree/tree_test.go @@ -124,7 +124,7 @@ var _ = Describe("Tree", func() { attr, err := env.Lookup.MetadataBackend().Get(resolveTrashPath, prefixes.TrashOriginAttr) Expect(err).ToNot(HaveOccurred()) - Expect(attr).To(Equal("/dir1/file1")) + Expect(string(attr)).To(Equal("/dir1/file1")) }) It("does not delete the blob from the blobstore", func() { @@ -401,7 +401,7 @@ var _ = Describe("Tree", func() { stopdir, err := env.CreateTestDir("testdir/stophere", &provider.Reference{ResourceId: env.SpaceRootRes}) Expect(err).ToNot(HaveOccurred()) - err = stopdir.SetXattr(prefixes.PropagationAttr, "0") + err = stopdir.SetXattrString(prefixes.PropagationAttr, "0") Expect(err).ToNot(HaveOccurred()) otherdir, err := env.CreateTestDir("testdir/stophere/lotsofbytes", &provider.Reference{ResourceId: env.SpaceRootRes}) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index b9195c7817..e95ec7aedd 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -38,15 +38,15 @@ import ( "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" - "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" - "github.com/gofrs/flock" "github.com/google/uuid" "github.com/pkg/errors" + "github.com/rogpeppe/go-internal/lockedfile" tusd "github.com/tus/tusd/pkg/handler" ) @@ -237,7 +237,7 @@ func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot stri } // CreateNodeForUpload will create the target node for the Upload -func CreateNodeForUpload(upload *Upload, initAttrs map[string]string) (*node.Node, error) { +func CreateNodeForUpload(upload *Upload, initAttrs node.Attributes) (*node.Node, error) { fi, err := os.Stat(upload.binPath) if err != nil { return nil, err @@ -266,26 +266,29 @@ func CreateNodeForUpload(upload *Upload, initAttrs map[string]string) (*node.Nod return nil, err } - var lock *flock.Flock + var f *lockedfile.File switch n.ID { case "": - lock, err = initNewNode(upload, n, uint64(fsize)) + f, err = initNewNode(upload, n, uint64(fsize)) default: - lock, err = updateExistingNode(upload, n, spaceID, uint64(fsize)) + f, err = updateExistingNode(upload, n, spaceID, uint64(fsize)) } - - defer filelocks.ReleaseLock(lock) //nolint:errcheck + defer func() { + if err := f.Close(); err != nil { + appctx.GetLogger(upload.Ctx).Error().Err(err).Str("nodeid", n.ID).Str("parentid", n.ParentID).Msg("could not close lock") + } + }() if err != nil { return nil, err } // overwrite technical information - initAttrs[prefixes.TypeAttr] = strconv.FormatInt(int64(n.Type()), 10) - initAttrs[prefixes.ParentidAttr] = n.ParentID - initAttrs[prefixes.NameAttr] = n.Name - initAttrs[prefixes.BlobIDAttr] = n.BlobID - initAttrs[prefixes.BlobsizeAttr] = strconv.FormatInt(n.Blobsize, 10) - initAttrs[prefixes.StatusPrefix] = node.ProcessingStatus + upload.Info.ID + initAttrs.SetInt64(prefixes.TypeAttr, int64(provider.ResourceType_RESOURCE_TYPE_FILE)) + initAttrs.SetString(prefixes.ParentidAttr, n.ParentID) + initAttrs.SetString(prefixes.NameAttr, n.Name) + initAttrs.SetString(prefixes.BlobIDAttr, n.BlobID) + initAttrs.SetInt64(prefixes.BlobsizeAttr, n.Blobsize) + initAttrs.SetString(prefixes.StatusPrefix, node.ProcessingStatus+upload.Info.ID) // update node metadata with new blobid etc err = n.SetXattrs(initAttrs, false) @@ -301,7 +304,7 @@ func CreateNodeForUpload(upload *Upload, initAttrs map[string]string) (*node.Nod } // add etag to metadata - tmtime, err := n.GetTMTime() + tmtime, err := n.GetMTime() if err != nil { return nil, err } @@ -316,7 +319,7 @@ func CreateNodeForUpload(upload *Upload, initAttrs map[string]string) (*node.Nod return n, nil } -func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*flock.Flock, error) { +func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*lockedfile.File, error) { n.ID = uuid.New().String() // create folder structure (if needed) @@ -324,18 +327,27 @@ func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*flock.Flock, erro return nil, err } - if _, err := os.Create(n.InternalPath()); err != nil { + // create and write lock new node metadata + f, err := lockedfile.OpenFile(upload.lu.MetadataBackend().MetadataPath(n.InternalPath()), os.O_RDWR|os.O_CREATE, 0600) + if err != nil { return nil, err } - lock, err := filelocks.AcquireWriteLock(n.InternalPath()) - if err != nil { - // we cannot acquire a lock - we error for safety - return lock, err + switch upload.lu.MetadataBackend().(type) { + case metadata.MessagePackBackend: + // for the ini and metadata backend we also need to touch the actual node file here. + // it stores the mtime of the resource, which must not change when we update the ini file + h, err := os.OpenFile(n.InternalPath(), os.O_CREATE, 0600) + if err != nil { + return f, err + } + h.Close() + case metadata.XattrsBackend: + // nothing to do } if _, err := node.CheckQuota(n.SpaceRoot, false, 0, fsize); err != nil { - return lock, err + return f, err } // link child name to parent if it is new @@ -343,23 +355,23 @@ func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*flock.Flock, erro link, err := os.Readlink(childNameLink) if err == nil && link != "../"+n.ID { if err := os.Remove(childNameLink); err != nil { - return lock, errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") + return f, errors.Wrap(err, "Decomposedfs: could not remove symlink child entry") } } if errors.Is(err, iofs.ErrNotExist) || link != "../"+n.ID { relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) if err = os.Symlink(relativeNodePath, childNameLink); err != nil { - return lock, errors.Wrap(err, "Decomposedfs: could not symlink child entry") + return f, errors.Wrap(err, "Decomposedfs: could not symlink child entry") } } // on a new file the sizeDiff is the fileSize upload.sizeDiff = int64(fsize) upload.Info.MetaData["sizeDiff"] = strconv.Itoa(int(upload.sizeDiff)) - return lock, nil + return f, nil } -func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) (*flock.Flock, error) { +func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) (*lockedfile.File, error) { old, _ := node.ReadNode(upload.Ctx, upload.lu, spaceID, n.ID, false) if _, err := node.CheckQuota(n.SpaceRoot, true, uint64(old.Blobsize), fsize); err != nil { return nil, err @@ -389,15 +401,15 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint targetPath := n.InternalPath() - lock, err := filelocks.AcquireWriteLock(targetPath) + // write lock existing node before reading treesize or tree time + f, err := lockedfile.OpenFile(upload.lu.MetadataBackend().MetadataPath(targetPath), os.O_RDWR, 0600) if err != nil { - // we cannot acquire a lock - we error for safety return nil, err } // create version node if _, err := os.Create(upload.versionsPath); err != nil { - return lock, err + return f, err } // copy blob metadata to version node @@ -406,13 +418,13 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint attributeName == prefixes.TypeAttr || attributeName == prefixes.BlobIDAttr || attributeName == prefixes.BlobsizeAttr - }, lock); err != nil { - return lock, err + }, f); err != nil { + return f, err } // keep mtime from previous version if err := os.Chtimes(upload.versionsPath, tmtime, tmtime); err != nil { - return lock, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err)) + return f, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err)) } // update mtime of current version @@ -421,7 +433,7 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint return nil, err } - return lock, nil + return f, nil } // lookupNode looks up nodes by path. diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index d8ab3b411e..d576cb2314 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -229,10 +229,10 @@ func (upload *Upload) FinishUpload(_ context.Context) error { } // update checksums - attrs := map[string]string{ - prefixes.ChecksumPrefix + "sha1": string(sha1h.Sum(nil)), - prefixes.ChecksumPrefix + "md5": string(md5h.Sum(nil)), - prefixes.ChecksumPrefix + "adler32": string(adler32h.Sum(nil)), + attrs := node.Attributes{ + prefixes.ChecksumPrefix + "sha1": sha1h.Sum(nil), + prefixes.ChecksumPrefix + "md5": md5h.Sum(nil), + prefixes.ChecksumPrefix + "adler32": adler32h.Sum(nil), } n, err := CreateNodeForUpload(upload, attrs) diff --git a/pkg/storage/utils/decomposedfs/upload_test.go b/pkg/storage/utils/decomposedfs/upload_test.go index b1b247cf48..5c5f8e9a1d 100644 --- a/pkg/storage/utils/decomposedfs/upload_test.go +++ b/pkg/storage/utils/decomposedfs/upload_test.go @@ -173,7 +173,7 @@ var _ = Describe("File uploads", func() { // the space name attribute is the stop condition in the lookup h, err := lu.NodeFromResource(ctx, rootRef) Expect(err).ToNot(HaveOccurred()) - err = h.SetXattr(prefixes.SpaceNameAttr, "username") + err = h.SetXattrString(prefixes.SpaceNameAttr, "username") Expect(err).ToNot(HaveOccurred()) permissions.On("AssemblePermissions", mock.Anything, mock.Anything, mock.Anything).Return(provider.ResourcePermissions{ Stat: true, diff --git a/tests/oc-integration-tests/drone/storage-users-ocis.toml b/tests/oc-integration-tests/drone/storage-users-ocis.toml index 0fc1662230..109095a465 100644 --- a/tests/oc-integration-tests/drone/storage-users-ocis.toml +++ b/tests/oc-integration-tests/drone/storage-users-ocis.toml @@ -27,6 +27,7 @@ treesize_accounting = true permissionssvc = "localhost:10000" personalspacealias_template = "{{.SpaceType}}/{{.User.Username}}" generalspacealias_template = "{{.SpaceType}}/{{.SpaceName | replace \" \" \"-\" | lower}}" +metadata_backend = "messagepack" # we have a locally running dataprovider [http] @@ -41,3 +42,4 @@ root = "/drone/src/tmp/reva/data" treetime_accounting = true treesize_accounting = true permissionssvc = "localhost:10000" +metadata_backend = "messagepack" diff --git a/tests/oc-integration-tests/drone/storage-users-s3ng.toml b/tests/oc-integration-tests/drone/storage-users-s3ng.toml index 8d55379981..db2f0ba336 100644 --- a/tests/oc-integration-tests/drone/storage-users-s3ng.toml +++ b/tests/oc-integration-tests/drone/storage-users-s3ng.toml @@ -28,7 +28,7 @@ permissionssvc = "localhost:10000" "s3.bucket" = "test" "s3.access_key" = "test" "s3.secret_key" = "test" -"metadata_backend" = "ini" +"metadata_backend" = "xattrs" # we have a locally running dataprovider [http] @@ -48,4 +48,4 @@ permissionssvc = "localhost:10000" "s3.bucket" = "test" "s3.access_key" = "test" "s3.secret_key" = "test" -"metadata_backend" = "ini" +"metadata_backend" = "xattrs"