Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ini metadata backend #3711

Merged
merged 15 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/replace-ini-backend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Replace ini metadata backend by messagepack backend

We replaced the ini metadata backend by a messagepack backend which is more robust and also uses less resources.

https://github.com/cs3org/reva/pull/3711
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
github.com/rs/zerolog v1.28.0
github.com/sciencemesh/meshdirectory-web v1.0.4
github.com/sethvargo/go-password v0.2.0
github.com/shamaton/msgpack/v2 v2.1.1
github.com/stretchr/testify v1.8.1
github.com/studio-b12/gowebdav v0.0.0-20221015232716-17255f2e7423
github.com/test-go/testify v1.1.4
Expand All @@ -86,7 +87,6 @@ require (
google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e
google.golang.org/grpc v1.50.1
google.golang.org/protobuf v1.28.1
gopkg.in/ini.v1 v1.67.0
gotest.tools v2.2.0+incompatible
)

Expand Down Expand Up @@ -204,6 +204,7 @@ require (
golang.org/x/time v0.1.0 // indirect
golang.org/x/tools v0.4.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/ini.v1 v1.66.6 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,8 @@ github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0/go.mod h1:Ad7IjTpvzZO8Fl0vh9AzQ+j/jYZfyp2diGwI8m5q+ns=
github.com/sethvargo/go-password v0.2.0 h1:BTDl4CC/gjf/axHMaDQtw507ogrXLci6XRiLc7i/UHI=
github.com/sethvargo/go-password v0.2.0/go.mod h1:Ym4Mr9JXLBycr02MFuVQ/0JHidNetSgbzutTr3zsYXE=
github.com/shamaton/msgpack/v2 v2.1.1 h1:gAMxOtVJz93R0EwewwUc8tx30n34aV6BzJuwHE8ogAk=
github.com/shamaton/msgpack/v2 v2.1.1/go.mod h1:aTUEmh31ziGX1Ml7wMPLVY0f4vT3CRsCvZRoSCs+VGg=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 h1:pXY9qYc/MP5zdvqWEUH6SjNiu7VhSjuVFTFiTcphaLU=
Expand Down Expand Up @@ -1551,8 +1553,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/h2non/gock.v1 v1.0.14/go.mod h1:sX4zAkdYX1TRGJ2JY156cFspQn4yRWn6p9EMdODlynE=
gopkg.in/h2non/gock.v1 v1.1.2/go.mod h1:n7UGz/ckNChHiK05rDoiC4MYSunEC/lyaUm2WWaDva0=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI=
gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI=
gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/telebot.v3 v3.0.0/go.mod h1:7rExV8/0mDDNu9epSrDm/8j22KLaActH1Tbee6YjzWg=
Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package cache

import (
"encoding/json"
"fmt"
"strings"
"sync"
Expand All @@ -32,6 +31,7 @@ import (
redisopts "github.com/go-redis/redis/v8"
"github.com/nats-io/nats.go"
microetcd "github.com/owncloud/ocis/v2/ocis-pkg/store/etcd"
"github.com/shamaton/msgpack/v2"
microstore "go-micro.dev/v4/store"
)

Expand Down Expand Up @@ -84,6 +84,7 @@ type CreatePersonalSpaceCache interface {
// FileMetadataCache handles file metadata
type FileMetadataCache interface {
Cache
RemoveMetadata(path string) error
}

// GetStatCache will return an existing StatCache for the given store, nodes, database and table
Expand Down Expand Up @@ -238,12 +239,13 @@ func (cache cacheStore) PullFromCache(key string, dest interface{}) error {
if len(r) == 0 {
return fmt.Errorf("not found")
}
return json.Unmarshal(r[0].Value, dest)

return msgpack.Unmarshal(r[0].Value, &dest)
}

// PushToCache pushes a key and value to the configured database and table of the underlying store
func (cache cacheStore) PushToCache(key string, src interface{}) error {
b, err := json.Marshal(src)
b, err := msgpack.Marshal(src)
if err != nil {
return err
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/storage/cache/filemetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cache

import (
"strings"
"time"
)

Expand All @@ -36,3 +37,18 @@ func NewFileMetadataCache(store string, nodes []string, database, table string,

return c
}

// RemoveMetadata removes a reference from the metadata cache
func (c *fileMetadataCache) RemoveMetadata(path string) error {
keys, err := c.List()
if err != nil {
return err
}

for _, key := range keys {
if strings.HasPrefix(key, path) {
_ = c.Delete(key)
}
}
return nil
}
8 changes: 4 additions & 4 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (
switch o.MetadataBackend {
case "xattrs":
lu = lookup.New(metadata.XattrsBackend{}, o)
case "ini":
lu = lookup.New(metadata.NewIniBackend(o.FileMetadataCache), o)
case "messagepack":
lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), o)
default:
return nil, fmt.Errorf("unknown metadata backend %s, only 'ini' or 'xattrs' (default) supported", o.MetadataBackend)
return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend)
}

tp := tree.New(o.Root, o.TreeTimeAccounting, o.TreeSizeAccounting, lu, bs)
Expand Down Expand Up @@ -575,7 +575,7 @@ func (fs *Decomposedfs) CreateDir(ctx context.Context, ref *provider.Reference)

if fs.o.TreeTimeAccounting || fs.o.TreeSizeAccounting {
// mark the home node as the end of propagation
if err = n.SetXattr(prefixes.PropagationAttr, "1"); err != nil {
if err = n.SetXattrString(prefixes.PropagationAttr, "1"); err != nil {
appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("could not mark node to propagate")

// FIXME: This does not return an error at all, but results in a severe situation that the
Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/utils/decomposedfs/decomposedfs_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ var _ = Describe("Decomposed", func() {
woFile.Close()
Eventually(func() bool { s, _ := states.Load("managedToOpenroFile"); return s.(bool) }).Should(BeTrue())
})

It("allows opening rw while an exclusive lock is being held", func() {
states := sync.Map{}

path, err := os.CreateTemp("", "decomposedfs-lockedfile-test-")
Expect(err).ToNot(HaveOccurred())
states.Store("managedToOpenrwLockedFile", false)
woFile, err := lockedfile.OpenFile(path.Name(), os.O_WRONLY, 0)
Expect(err).ToNot(HaveOccurred())
defer woFile.Close()
go func() {
roFile, err := os.OpenFile(path.Name(), os.O_RDWR, 0)
Expect(err).ToNot(HaveOccurred())
defer roFile.Close()
states.Store("managedToOpenrwLockedFile", true)
}()

woFile.Close()
Eventually(func() bool { s, _ := states.Load("managedToOpenrwLockedFile"); return s.(bool) }).Should(BeTrue())
})
})

})
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/utils/decomposedfs/grants.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference)
return nil, errtypes.NotFound(f)
}
log := appctx.GetLogger(ctx)
var attrs map[string]string
var attrs node.Attributes
if attrs, err = grantNode.Xattrs(); err != nil {
log.Error().Err(err).Msg("error listing attributes")
return nil, err
Expand All @@ -146,7 +146,7 @@ func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference)
var err error
var e *ace.ACE
principal := k[len(prefixes.GrantPrefix):]
if e, err = ace.Unmarshal(principal, []byte(v)); err != nil {
if e, err = ace.Unmarshal(principal, v); err != nil {
log.Error().Err(err).Str("principal", principal).Str("attr", k).Msg("could not unmarshal ace")
continue
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provide
// set the grant
e := ace.FromGrant(g)
principal, value := e.Marshal()
if err := n.SetXattr(prefixes.GrantPrefix+principal, string(value)); err != nil {
if err := n.SetXattr(prefixes.GrantPrefix+principal, value); err != nil {
appctx.GetLogger(ctx).Error().Err(err).
Str("principal", principal).Msg("Could not set grant for principal")
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/grants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ var _ = Describe("Grants", func() {
Path: "/dir1",
})
Expect(err).ToNot(HaveOccurred())
attr, err := n.Xattr(prefixes.GrantUserAcePrefix + grant.Grantee.GetUserId().OpaqueId)
attr, err := n.XattrString(prefixes.GrantUserAcePrefix + grant.Grantee.GetUserId().OpaqueId)
Expect(err).ToNot(HaveOccurred())
Expect(attr).To(Equal(fmt.Sprintf("\x00t=A:f=:p=rw:c=%s:e=0\n", o.GetOpaqueId()))) // NOTE: this tests ace package
})
Expand Down
80 changes: 27 additions & 53 deletions pkg/storage/utils/decomposedfs/lookup/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand All @@ -33,9 +32,8 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/gofrs/flock"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
)

// Lookup implements transformations from filepath to node and back
Expand All @@ -60,14 +58,10 @@ func (lu *Lookup) MetadataBackend() metadata.Backend {

// ReadBlobSizeAttr reads the blobsize from the xattrs
func (lu *Lookup) ReadBlobSizeAttr(path string) (int64, error) {
attr, err := lu.metadataBackend.Get(path, prefixes.BlobsizeAttr)
blobSize, err := lu.metadataBackend.GetInt64(path, prefixes.BlobsizeAttr)
if err != nil {
return 0, errors.Wrapf(err, "error reading blobsize xattr")
}
blobSize, err := strconv.ParseInt(attr, 10, 64)
if err != nil {
return 0, errors.Wrapf(err, "invalid blobsize xattr format")
}
return blobSize, nil
}

Expand All @@ -77,22 +71,18 @@ func (lu *Lookup) ReadBlobIDAttr(path string) (string, error) {
if err != nil {
return "", errors.Wrapf(err, "error reading blobid xattr")
}
return attr, nil
return string(attr), nil
}

// TypeFromPath returns the type of the node at the given path
func (lu *Lookup) TypeFromPath(path string) provider.ResourceType {
// Try to read from xattrs
typeAttr, err := lu.metadataBackend.Get(path, prefixes.TypeAttr)
t := provider.ResourceType_RESOURCE_TYPE_INVALID
typeAttr, err := lu.metadataBackend.GetInt64(path, prefixes.TypeAttr)
if err == nil {
typeInt, err := strconv.ParseInt(typeAttr, 10, 32)
if err != nil {
return t
}
return provider.ResourceType(typeInt)
return provider.ResourceType(int32(typeAttr))
}

t := provider.ResourceType_RESOURCE_TYPE_INVALID
// Fall back to checking on disk
fi, err := os.Lstat(path)
if err != nil {
Expand Down Expand Up @@ -219,7 +209,7 @@ func (lu *Lookup) WalkPath(ctx context.Context, r *node.Node, p string, followRe
if followReferences {
if attrBytes, err := r.Xattr(prefixes.ReferenceAttr); err == nil {
realNodeID := attrBytes
ref, err := refFromCS3([]byte(realNodeID))
ref, err := refFromCS3(realNodeID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -277,69 +267,53 @@ func refFromCS3(b []byte) (*provider.Reference, error) {
// CopyMetadata copies all extended attributes from source to target.
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
// For the source file, a shared lock is acquired.
// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally
// NOTE: target resource will be write locked!
func (lu *Lookup) CopyMetadata(src, target string, filter func(attributeName string) bool) (err error) {
var readLock *flock.Flock

// Acquire a read log on the source node
readLock, err = filelocks.AcquireReadLock(src)
// write lock existing node before reading treesize or tree time
f, err := lockedfile.Open(lu.MetadataBackend().MetadataPath(src))
if err != nil {
return err
}

if err != nil {
return errors.Wrap(err, "xattrs: Unable to lock source to read")
}
defer func() {
rerr := filelocks.ReleaseLock(readLock)
rerr := f.Close()

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

return lu.CopyMetadataWithSourceLock(src, target, filter, readLock)
return lu.CopyMetadataWithSourceLock(src, target, filter, f)
}

// CopyMetadataWithSourceLock copies all extended attributes from source to target.
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
// For the source file, a shared lock is acquired.
// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally
func (lu *Lookup) CopyMetadataWithSourceLock(src, target string, filter func(attributeName string) bool, readLock *flock.Flock) (err error) {
// For the source file, a matching lockedfile is required.
// NOTE: target resource will be write locked!
func (lu *Lookup) CopyMetadataWithSourceLock(sourcePath, targetPath string, filter func(attributeName string) bool, lockedSource *lockedfile.File) (err error) {
switch {
case readLock == nil:
case lockedSource == nil:
return errors.New("no lock provided")
case readLock.Path() != filelocks.FlockFile(src):
case lockedSource.File.Name() != lu.MetadataBackend().MetadataPath(sourcePath):
return errors.New("lockpath does not match filepath")
case !readLock.Locked() && !readLock.RLocked(): // we need either a read or a write lock
return errors.New("not locked")
}

// both locks are established. Copy.
var attrNameList []string
if attrNameList, err = lu.metadataBackend.List(src); err != nil {
return errors.Wrap(err, "Can not get xattr listing on src")
attrs, err := lu.metadataBackend.AllWithLockedSource(sourcePath, lockedSource)
if err != nil {
return err
}

// error handling: We count errors of reads or writes of xattrs.
// if there were any read or write errors an error is returned.
var (
xerrs = 0
xerr error
)
for idx := range attrNameList {
attrName := attrNameList[idx]
newAttrs := make(map[string][]byte, 0)
for attrName, val := range attrs {
if filter == nil || filter(attrName) {
var attrVal string
if attrVal, xerr = lu.metadataBackend.Get(src, attrName); xerr != nil {
xerrs++
}
if xerr = lu.metadataBackend.Set(target, attrName, attrVal); xerr != nil {
xerrs++
}
newAttrs[attrName] = val
}
}
if xerrs > 0 {
err = errors.Wrap(xerr, "failed to copy all xattrs, last error returned")
}

return err
return lu.MetadataBackend().SetMultiple(targetPath, newAttrs, true)
}
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
}
for k, v := range md.Metadata {
attrName := prefixes.MetadataPrefix + k
if err = n.SetXattr(attrName, v); err != nil {
if err = n.SetXattrString(attrName, v); err != nil {
errs = append(errs, errors.Wrap(err, "Decomposedfs: could not set metadata attribute "+attrName+" to "+k))
}
}
Expand Down
Loading