Skip to content

Commit

Permalink
Fix ini metadata backend (#3711)
Browse files Browse the repository at this point in the history
* Use msgpack to serialize cache data instead of json

* Use storage-relative paths as the cache key

That prevents non-aligned cache keys when multiple providers mount the
storage at a different location.

* Create dedicated type for node attributes, based on map[string][]byte

* add messagepack backend

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* Switch msgpack library to a faster, leaner one

* Adapt changelog

* Cleanup

* Fix test

* Fix error message

* Use messagepack backend in the s3ng acceptance tests

* make hound happy

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* flip tests

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* Do not try to get the tmtime for files being uploaded

* Extend the metadata backend to read the attributes from a locked source

* Fix comment

---------

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
Co-authored-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
aduffeck and butonic authored Mar 13, 2023
1 parent 07198ea commit 4339b8a
Show file tree
Hide file tree
Showing 30 changed files with 767 additions and 686 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/replace-ini-backend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Replace ini metadata backend by messagepack backend

We replaced the ini metadata backend by a messagepack backend which is more robust and also uses less resources.

https://github.com/cs3org/reva/pull/3711
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
github.com/rs/zerolog v1.28.0
github.com/sciencemesh/meshdirectory-web v1.0.4
github.com/sethvargo/go-password v0.2.0
github.com/shamaton/msgpack/v2 v2.1.1
github.com/stretchr/testify v1.8.1
github.com/studio-b12/gowebdav v0.0.0-20221015232716-17255f2e7423
github.com/test-go/testify v1.1.4
Expand All @@ -86,7 +87,6 @@ require (
google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e
google.golang.org/grpc v1.50.1
google.golang.org/protobuf v1.28.1
gopkg.in/ini.v1 v1.67.0
gotest.tools v2.2.0+incompatible
)

Expand Down Expand Up @@ -204,6 +204,7 @@ require (
golang.org/x/time v0.1.0 // indirect
golang.org/x/tools v0.4.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/ini.v1 v1.66.6 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,8 @@ github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0/go.mod h1:Ad7IjTpvzZO8Fl0vh9AzQ+j/jYZfyp2diGwI8m5q+ns=
github.com/sethvargo/go-password v0.2.0 h1:BTDl4CC/gjf/axHMaDQtw507ogrXLci6XRiLc7i/UHI=
github.com/sethvargo/go-password v0.2.0/go.mod h1:Ym4Mr9JXLBycr02MFuVQ/0JHidNetSgbzutTr3zsYXE=
github.com/shamaton/msgpack/v2 v2.1.1 h1:gAMxOtVJz93R0EwewwUc8tx30n34aV6BzJuwHE8ogAk=
github.com/shamaton/msgpack/v2 v2.1.1/go.mod h1:aTUEmh31ziGX1Ml7wMPLVY0f4vT3CRsCvZRoSCs+VGg=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 h1:pXY9qYc/MP5zdvqWEUH6SjNiu7VhSjuVFTFiTcphaLU=
Expand Down Expand Up @@ -1551,8 +1553,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/h2non/gock.v1 v1.0.14/go.mod h1:sX4zAkdYX1TRGJ2JY156cFspQn4yRWn6p9EMdODlynE=
gopkg.in/h2non/gock.v1 v1.1.2/go.mod h1:n7UGz/ckNChHiK05rDoiC4MYSunEC/lyaUm2WWaDva0=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI=
gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI=
gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/telebot.v3 v3.0.0/go.mod h1:7rExV8/0mDDNu9epSrDm/8j22KLaActH1Tbee6YjzWg=
Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package cache

import (
"encoding/json"
"fmt"
"strings"
"sync"
Expand All @@ -32,6 +31,7 @@ import (
redisopts "github.com/go-redis/redis/v8"
"github.com/nats-io/nats.go"
microetcd "github.com/owncloud/ocis/v2/ocis-pkg/store/etcd"
"github.com/shamaton/msgpack/v2"
microstore "go-micro.dev/v4/store"
)

Expand Down Expand Up @@ -84,6 +84,7 @@ type CreatePersonalSpaceCache interface {
// FileMetadataCache handles file metadata
type FileMetadataCache interface {
Cache
RemoveMetadata(path string) error
}

// GetStatCache will return an existing StatCache for the given store, nodes, database and table
Expand Down Expand Up @@ -238,12 +239,13 @@ func (cache cacheStore) PullFromCache(key string, dest interface{}) error {
if len(r) == 0 {
return fmt.Errorf("not found")
}
return json.Unmarshal(r[0].Value, dest)

return msgpack.Unmarshal(r[0].Value, &dest)
}

// PushToCache pushes a key and value to the configured database and table of the underlying store
func (cache cacheStore) PushToCache(key string, src interface{}) error {
b, err := json.Marshal(src)
b, err := msgpack.Marshal(src)
if err != nil {
return err
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/storage/cache/filemetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cache

import (
"strings"
"time"
)

Expand All @@ -36,3 +37,18 @@ func NewFileMetadataCache(store string, nodes []string, database, table string,

return c
}

// RemoveMetadata removes a reference from the metadata cache
func (c *fileMetadataCache) RemoveMetadata(path string) error {
keys, err := c.List()
if err != nil {
return err
}

for _, key := range keys {
if strings.HasPrefix(key, path) {
_ = c.Delete(key)
}
}
return nil
}
8 changes: 4 additions & 4 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (
switch o.MetadataBackend {
case "xattrs":
lu = lookup.New(metadata.XattrsBackend{}, o)
case "ini":
lu = lookup.New(metadata.NewIniBackend(o.FileMetadataCache), o)
case "messagepack":
lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), o)
default:
return nil, fmt.Errorf("unknown metadata backend %s, only 'ini' or 'xattrs' (default) supported", o.MetadataBackend)
return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend)
}

tp := tree.New(o.Root, o.TreeTimeAccounting, o.TreeSizeAccounting, lu, bs)
Expand Down Expand Up @@ -575,7 +575,7 @@ func (fs *Decomposedfs) CreateDir(ctx context.Context, ref *provider.Reference)

if fs.o.TreeTimeAccounting || fs.o.TreeSizeAccounting {
// mark the home node as the end of propagation
if err = n.SetXattr(prefixes.PropagationAttr, "1"); err != nil {
if err = n.SetXattrString(prefixes.PropagationAttr, "1"); err != nil {
appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("could not mark node to propagate")

// FIXME: This does not return an error at all, but results in a severe situation that the
Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/utils/decomposedfs/decomposedfs_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ var _ = Describe("Decomposed", func() {
woFile.Close()
Eventually(func() bool { s, _ := states.Load("managedToOpenroFile"); return s.(bool) }).Should(BeTrue())
})

It("allows opening rw while an exclusive lock is being held", func() {
states := sync.Map{}

path, err := os.CreateTemp("", "decomposedfs-lockedfile-test-")
Expect(err).ToNot(HaveOccurred())
states.Store("managedToOpenrwLockedFile", false)
woFile, err := lockedfile.OpenFile(path.Name(), os.O_WRONLY, 0)
Expect(err).ToNot(HaveOccurred())
defer woFile.Close()
go func() {
roFile, err := os.OpenFile(path.Name(), os.O_RDWR, 0)
Expect(err).ToNot(HaveOccurred())
defer roFile.Close()
states.Store("managedToOpenrwLockedFile", true)
}()

woFile.Close()
Eventually(func() bool { s, _ := states.Load("managedToOpenrwLockedFile"); return s.(bool) }).Should(BeTrue())
})
})

})
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/utils/decomposedfs/grants.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference)
return nil, errtypes.NotFound(f)
}
log := appctx.GetLogger(ctx)
var attrs map[string]string
var attrs node.Attributes
if attrs, err = grantNode.Xattrs(); err != nil {
log.Error().Err(err).Msg("error listing attributes")
return nil, err
Expand All @@ -146,7 +146,7 @@ func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference)
var err error
var e *ace.ACE
principal := k[len(prefixes.GrantPrefix):]
if e, err = ace.Unmarshal(principal, []byte(v)); err != nil {
if e, err = ace.Unmarshal(principal, v); err != nil {
log.Error().Err(err).Str("principal", principal).Str("attr", k).Msg("could not unmarshal ace")
continue
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provide
// set the grant
e := ace.FromGrant(g)
principal, value := e.Marshal()
if err := n.SetXattr(prefixes.GrantPrefix+principal, string(value)); err != nil {
if err := n.SetXattr(prefixes.GrantPrefix+principal, value); err != nil {
appctx.GetLogger(ctx).Error().Err(err).
Str("principal", principal).Msg("Could not set grant for principal")
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/grants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ var _ = Describe("Grants", func() {
Path: "/dir1",
})
Expect(err).ToNot(HaveOccurred())
attr, err := n.Xattr(prefixes.GrantUserAcePrefix + grant.Grantee.GetUserId().OpaqueId)
attr, err := n.XattrString(prefixes.GrantUserAcePrefix + grant.Grantee.GetUserId().OpaqueId)
Expect(err).ToNot(HaveOccurred())
Expect(attr).To(Equal(fmt.Sprintf("\x00t=A:f=:p=rw:c=%s:e=0\n", o.GetOpaqueId()))) // NOTE: this tests ace package
})
Expand Down
80 changes: 27 additions & 53 deletions pkg/storage/utils/decomposedfs/lookup/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand All @@ -33,9 +32,8 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/gofrs/flock"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
)

// Lookup implements transformations from filepath to node and back
Expand All @@ -60,14 +58,10 @@ func (lu *Lookup) MetadataBackend() metadata.Backend {

// ReadBlobSizeAttr reads the blobsize from the xattrs
func (lu *Lookup) ReadBlobSizeAttr(path string) (int64, error) {
attr, err := lu.metadataBackend.Get(path, prefixes.BlobsizeAttr)
blobSize, err := lu.metadataBackend.GetInt64(path, prefixes.BlobsizeAttr)
if err != nil {
return 0, errors.Wrapf(err, "error reading blobsize xattr")
}
blobSize, err := strconv.ParseInt(attr, 10, 64)
if err != nil {
return 0, errors.Wrapf(err, "invalid blobsize xattr format")
}
return blobSize, nil
}

Expand All @@ -77,22 +71,18 @@ func (lu *Lookup) ReadBlobIDAttr(path string) (string, error) {
if err != nil {
return "", errors.Wrapf(err, "error reading blobid xattr")
}
return attr, nil
return string(attr), nil
}

// TypeFromPath returns the type of the node at the given path
func (lu *Lookup) TypeFromPath(path string) provider.ResourceType {
// Try to read from xattrs
typeAttr, err := lu.metadataBackend.Get(path, prefixes.TypeAttr)
t := provider.ResourceType_RESOURCE_TYPE_INVALID
typeAttr, err := lu.metadataBackend.GetInt64(path, prefixes.TypeAttr)
if err == nil {
typeInt, err := strconv.ParseInt(typeAttr, 10, 32)
if err != nil {
return t
}
return provider.ResourceType(typeInt)
return provider.ResourceType(int32(typeAttr))
}

t := provider.ResourceType_RESOURCE_TYPE_INVALID
// Fall back to checking on disk
fi, err := os.Lstat(path)
if err != nil {
Expand Down Expand Up @@ -219,7 +209,7 @@ func (lu *Lookup) WalkPath(ctx context.Context, r *node.Node, p string, followRe
if followReferences {
if attrBytes, err := r.Xattr(prefixes.ReferenceAttr); err == nil {
realNodeID := attrBytes
ref, err := refFromCS3([]byte(realNodeID))
ref, err := refFromCS3(realNodeID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -277,69 +267,53 @@ func refFromCS3(b []byte) (*provider.Reference, error) {
// CopyMetadata copies all extended attributes from source to target.
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
// For the source file, a shared lock is acquired.
// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally
// NOTE: target resource will be write locked!
func (lu *Lookup) CopyMetadata(src, target string, filter func(attributeName string) bool) (err error) {
var readLock *flock.Flock

// Acquire a read log on the source node
readLock, err = filelocks.AcquireReadLock(src)
// write lock existing node before reading treesize or tree time
f, err := lockedfile.Open(lu.MetadataBackend().MetadataPath(src))
if err != nil {
return err
}

if err != nil {
return errors.Wrap(err, "xattrs: Unable to lock source to read")
}
defer func() {
rerr := filelocks.ReleaseLock(readLock)
rerr := f.Close()

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

return lu.CopyMetadataWithSourceLock(src, target, filter, readLock)
return lu.CopyMetadataWithSourceLock(src, target, filter, f)
}

// CopyMetadataWithSourceLock copies all extended attributes from source to target.
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
// For the source file, a shared lock is acquired.
// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally
func (lu *Lookup) CopyMetadataWithSourceLock(src, target string, filter func(attributeName string) bool, readLock *flock.Flock) (err error) {
// For the source file, a matching lockedfile is required.
// NOTE: target resource will be write locked!
func (lu *Lookup) CopyMetadataWithSourceLock(sourcePath, targetPath string, filter func(attributeName string) bool, lockedSource *lockedfile.File) (err error) {
switch {
case readLock == nil:
case lockedSource == nil:
return errors.New("no lock provided")
case readLock.Path() != filelocks.FlockFile(src):
case lockedSource.File.Name() != lu.MetadataBackend().MetadataPath(sourcePath):
return errors.New("lockpath does not match filepath")
case !readLock.Locked() && !readLock.RLocked(): // we need either a read or a write lock
return errors.New("not locked")
}

// both locks are established. Copy.
var attrNameList []string
if attrNameList, err = lu.metadataBackend.List(src); err != nil {
return errors.Wrap(err, "Can not get xattr listing on src")
attrs, err := lu.metadataBackend.AllWithLockedSource(sourcePath, lockedSource)
if err != nil {
return err
}

// error handling: We count errors of reads or writes of xattrs.
// if there were any read or write errors an error is returned.
var (
xerrs = 0
xerr error
)
for idx := range attrNameList {
attrName := attrNameList[idx]
newAttrs := make(map[string][]byte, 0)
for attrName, val := range attrs {
if filter == nil || filter(attrName) {
var attrVal string
if attrVal, xerr = lu.metadataBackend.Get(src, attrName); xerr != nil {
xerrs++
}
if xerr = lu.metadataBackend.Set(target, attrName, attrVal); xerr != nil {
xerrs++
}
newAttrs[attrName] = val
}
}
if xerrs > 0 {
err = errors.Wrap(xerr, "failed to copy all xattrs, last error returned")
}

return err
return lu.MetadataBackend().SetMultiple(targetPath, newAttrs, true)
}
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
}
for k, v := range md.Metadata {
attrName := prefixes.MetadataPrefix + k
if err = n.SetXattr(attrName, v); err != nil {
if err = n.SetXattrString(attrName, v); err != nil {
errs = append(errs, errors.Wrap(err, "Decomposedfs: could not set metadata attribute "+attrName+" to "+k))
}
}
Expand Down
Loading

0 comments on commit 4339b8a

Please sign in to comment.