Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: cs3org/reva
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: d57f9c5
Choose a base ref
...
head repository: cs3org/reva
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 819b789
Choose a head ref
  • 6 commits
  • 27 files changed
  • 2 contributors

Commits on Mar 13, 2023

  1. Copy the full SHA
    ca06d06 View commit details
  2. Use storage-relative paths as the cache key

    That prevents non-aligned cache keys when multiple providers mount the
    storage at a different location.
    aduffeck committed Mar 13, 2023
    Copy the full SHA
    719e536 View commit details
  3. Copy the full SHA
    371c1d7 View commit details
  4. add messagepack backend

    Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
    butonic authored and aduffeck committed Mar 13, 2023
    Copy the full SHA
    204253e View commit details
  5. Copy the full SHA
    a6f60ad View commit details
  6. Adapt changelog

    aduffeck committed Mar 13, 2023
    Copy the full SHA
    819b789 View commit details
Showing with 529 additions and 481 deletions.
  1. +5 −0 changelog/unreleased/replace-ini-backend.md
  2. +2 −1 go.mod
  3. +4 −2 go.sum
  4. +5 −3 pkg/storage/cache/cache.go
  5. +16 −0 pkg/storage/cache/filemetadata.go
  6. +5 −3 pkg/storage/utils/decomposedfs/decomposedfs.go
  7. +20 −0 pkg/storage/utils/decomposedfs/decomposedfs_concurrency_test.go
  8. +2 −2 pkg/storage/utils/decomposedfs/grants.go
  9. +1 −1 pkg/storage/utils/decomposedfs/grants_test.go
  10. +25 −51 pkg/storage/utils/decomposedfs/lookup/lookup.go
  11. +1 −1 pkg/storage/utils/decomposedfs/metadata.go
  12. +74 −106 pkg/storage/utils/decomposedfs/metadata/{ini_backend.go → messagepack_backend.go}
  13. +8 −8 pkg/storage/utils/decomposedfs/metadata/metadata.go
  14. +44 −72 pkg/storage/utils/decomposedfs/metadata/metadata_test.go
  15. +10 −15 pkg/storage/utils/decomposedfs/metadata/xattrs_backend.go
  16. +84 −71 pkg/storage/utils/decomposedfs/node/node.go
  17. +1 −1 pkg/storage/utils/decomposedfs/node/node_test.go
  18. +67 −6 pkg/storage/utils/decomposedfs/node/xattrs.go
  19. +4 −4 pkg/storage/utils/decomposedfs/recycle.go
  20. +42 −46 pkg/storage/utils/decomposedfs/spaces.go
  21. +2 −2 pkg/storage/utils/decomposedfs/testhelpers/helpers.go
  22. +1 −1 pkg/storage/utils/decomposedfs/tree/migrations.go
  23. +56 −47 pkg/storage/utils/decomposedfs/tree/tree.go
  24. +1 −1 pkg/storage/utils/decomposedfs/tree/tree_test.go
  25. +44 −32 pkg/storage/utils/decomposedfs/upload/processing.go
  26. +4 −4 pkg/storage/utils/decomposedfs/upload/upload.go
  27. +1 −1 pkg/storage/utils/decomposedfs/upload_test.go
5 changes: 5 additions & 0 deletions changelog/unreleased/replace-ini-backend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Replace ini metadata backend by messagepack backend

We replaced the ini metadata backend by a messagepack backend which is more robust and also uses less resources.

https://github.com/cs3org/reva/pull/3711
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -64,6 +64,7 @@ require (
github.com/rs/zerolog v1.28.0
github.com/sciencemesh/meshdirectory-web v1.0.4
github.com/sethvargo/go-password v0.2.0
github.com/shamaton/msgpack/v2 v2.1.1
github.com/stretchr/testify v1.8.1
github.com/studio-b12/gowebdav v0.0.0-20221015232716-17255f2e7423
github.com/test-go/testify v1.1.4
@@ -86,7 +87,6 @@ require (
google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e
google.golang.org/grpc v1.50.1
google.golang.org/protobuf v1.28.1
gopkg.in/ini.v1 v1.67.0
gotest.tools v2.2.0+incompatible
)

@@ -204,6 +204,7 @@ require (
golang.org/x/time v0.1.0 // indirect
golang.org/x/tools v0.4.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/ini.v1 v1.66.6 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -853,6 +853,8 @@ github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0/go.mod h1:Ad7IjTpvzZO8Fl0vh9AzQ+j/jYZfyp2diGwI8m5q+ns=
github.com/sethvargo/go-password v0.2.0 h1:BTDl4CC/gjf/axHMaDQtw507ogrXLci6XRiLc7i/UHI=
github.com/sethvargo/go-password v0.2.0/go.mod h1:Ym4Mr9JXLBycr02MFuVQ/0JHidNetSgbzutTr3zsYXE=
github.com/shamaton/msgpack/v2 v2.1.1 h1:gAMxOtVJz93R0EwewwUc8tx30n34aV6BzJuwHE8ogAk=
github.com/shamaton/msgpack/v2 v2.1.1/go.mod h1:aTUEmh31ziGX1Ml7wMPLVY0f4vT3CRsCvZRoSCs+VGg=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 h1:pXY9qYc/MP5zdvqWEUH6SjNiu7VhSjuVFTFiTcphaLU=
@@ -1551,8 +1553,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/h2non/gock.v1 v1.0.14/go.mod h1:sX4zAkdYX1TRGJ2JY156cFspQn4yRWn6p9EMdODlynE=
gopkg.in/h2non/gock.v1 v1.1.2/go.mod h1:n7UGz/ckNChHiK05rDoiC4MYSunEC/lyaUm2WWaDva0=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI=
gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI=
gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/telebot.v3 v3.0.0/go.mod h1:7rExV8/0mDDNu9epSrDm/8j22KLaActH1Tbee6YjzWg=
8 changes: 5 additions & 3 deletions pkg/storage/cache/cache.go
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@
package cache

import (
"encoding/json"
"fmt"
"strings"
"sync"
@@ -32,6 +31,7 @@ import (
redisopts "github.com/go-redis/redis/v8"
"github.com/nats-io/nats.go"
microetcd "github.com/owncloud/ocis/v2/ocis-pkg/store/etcd"
"github.com/shamaton/msgpack/v2"
microstore "go-micro.dev/v4/store"
)

@@ -84,6 +84,7 @@ type CreatePersonalSpaceCache interface {
// FileMetadataCache handles file metadata
type FileMetadataCache interface {
Cache
RemoveMetadata(path string) error
}

// GetStatCache will return an existing StatCache for the given store, nodes, database and table
@@ -238,12 +239,13 @@ func (cache cacheStore) PullFromCache(key string, dest interface{}) error {
if len(r) == 0 {
return fmt.Errorf("not found")
}
return json.Unmarshal(r[0].Value, dest)

return msgpack.Unmarshal(r[0].Value, &dest)
}

// PushToCache pushes a key and value to the configured database and table of the underlying store
func (cache cacheStore) PushToCache(key string, src interface{}) error {
b, err := json.Marshal(src)
b, err := msgpack.Marshal(src)
if err != nil {
return err
}
16 changes: 16 additions & 0 deletions pkg/storage/cache/filemetadata.go
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
package cache

import (
"strings"
"time"
)

@@ -36,3 +37,18 @@ func NewFileMetadataCache(store string, nodes []string, database, table string,

return c
}

// RemoveMetadata removes a reference from the metadata cache
func (c *fileMetadataCache) RemoveMetadata(path string) error {
keys, err := c.List()
if err != nil {
return err
}

for _, key := range keys {
if strings.HasPrefix(key, path) {
_ = c.Delete(key)
}
}
return nil
}
8 changes: 5 additions & 3 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
@@ -107,10 +107,12 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (
switch o.MetadataBackend {
case "xattrs":
lu = lookup.New(metadata.XattrsBackend{}, o)
case "messagepack":
lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), o)
case "ini":
lu = lookup.New(metadata.NewIniBackend(o.FileMetadataCache), o)
lu = lookup.New(metadata.NewIniBackend(o.Root, o.FileMetadataCache), o)
default:
return nil, fmt.Errorf("unknown metadata backend %s, only 'ini' or 'xattrs' (default) supported", o.MetadataBackend)
return nil, fmt.Errorf("unknown metadata backend %s, only 'ini', 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend)
}

tp := tree.New(o.Root, o.TreeTimeAccounting, o.TreeSizeAccounting, lu, bs)
@@ -575,7 +577,7 @@ func (fs *Decomposedfs) CreateDir(ctx context.Context, ref *provider.Reference)

if fs.o.TreeTimeAccounting || fs.o.TreeSizeAccounting {
// mark the home node as the end of propagation
if err = n.SetXattr(prefixes.PropagationAttr, "1"); err != nil {
if err = n.SetXattrString(prefixes.PropagationAttr, "1"); err != nil {
appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("could not mark node to propagate")

// FIXME: This does not return an error at all, but results in a severe situation that the
20 changes: 20 additions & 0 deletions pkg/storage/utils/decomposedfs/decomposedfs_concurrency_test.go
Original file line number Diff line number Diff line change
@@ -112,6 +112,26 @@ var _ = Describe("Decomposed", func() {
woFile.Close()
Eventually(func() bool { s, _ := states.Load("managedToOpenroFile"); return s.(bool) }).Should(BeTrue())
})

It("allows opening rw while an exclusive lock is being held", func() {
states := sync.Map{}

path, err := os.CreateTemp("", "decomposedfs-lockedfile-test-")
Expect(err).ToNot(HaveOccurred())
states.Store("managedToOpenrwLockedFile", false)
woFile, err := lockedfile.OpenFile(path.Name(), os.O_WRONLY, 0)
Expect(err).ToNot(HaveOccurred())
defer woFile.Close()
go func() {
roFile, err := os.OpenFile(path.Name(), os.O_RDWR, 0)
Expect(err).ToNot(HaveOccurred())
defer roFile.Close()
states.Store("managedToOpenrwLockedFile", true)
}()

woFile.Close()
Eventually(func() bool { s, _ := states.Load("managedToOpenrwLockedFile"); return s.(bool) }).Should(BeTrue())
})
})

})
4 changes: 2 additions & 2 deletions pkg/storage/utils/decomposedfs/grants.go
Original file line number Diff line number Diff line change
@@ -134,7 +134,7 @@ func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference)
return nil, errtypes.NotFound(f)
}
log := appctx.GetLogger(ctx)
var attrs map[string]string
var attrs node.Attributes
if attrs, err = grantNode.Xattrs(); err != nil {
log.Error().Err(err).Msg("error listing attributes")
return nil, err
@@ -312,7 +312,7 @@ func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provide
// set the grant
e := ace.FromGrant(g)
principal, value := e.Marshal()
if err := n.SetXattr(prefixes.GrantPrefix+principal, string(value)); err != nil {
if err := n.SetXattr(prefixes.GrantPrefix+principal, value); err != nil {
appctx.GetLogger(ctx).Error().Err(err).
Str("principal", principal).Msg("Could not set grant for principal")
return err
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/grants_test.go
Original file line number Diff line number Diff line change
@@ -138,7 +138,7 @@ var _ = Describe("Grants", func() {
Path: "/dir1",
})
Expect(err).ToNot(HaveOccurred())
attr, err := n.Xattr(prefixes.GrantUserAcePrefix + grant.Grantee.GetUserId().OpaqueId)
attr, err := n.XattrString(prefixes.GrantUserAcePrefix + grant.Grantee.GetUserId().OpaqueId)
Expect(err).ToNot(HaveOccurred())
Expect(attr).To(Equal(fmt.Sprintf("\x00t=A:f=:p=rw:c=%s:e=0\n", o.GetOpaqueId()))) // NOTE: this tests ace package
})
76 changes: 25 additions & 51 deletions pkg/storage/utils/decomposedfs/lookup/lookup.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
@@ -33,9 +32,8 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/gofrs/flock"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
)

// Lookup implements transformations from filepath to node and back
@@ -60,14 +58,10 @@ func (lu *Lookup) MetadataBackend() metadata.Backend {

// ReadBlobSizeAttr reads the blobsize from the xattrs
func (lu *Lookup) ReadBlobSizeAttr(path string) (int64, error) {
attr, err := lu.metadataBackend.Get(path, prefixes.BlobsizeAttr)
blobSize, err := lu.metadataBackend.GetInt64(path, prefixes.BlobsizeAttr)
if err != nil {
return 0, errors.Wrapf(err, "error reading blobsize xattr")
}
blobSize, err := strconv.ParseInt(attr, 10, 64)
if err != nil {
return 0, errors.Wrapf(err, "invalid blobsize xattr format")
}
return blobSize, nil
}

@@ -77,22 +71,18 @@ func (lu *Lookup) ReadBlobIDAttr(path string) (string, error) {
if err != nil {
return "", errors.Wrapf(err, "error reading blobid xattr")
}
return attr, nil
return string(attr), nil
}

// TypeFromPath returns the type of the node at the given path
func (lu *Lookup) TypeFromPath(path string) provider.ResourceType {
// Try to read from xattrs
typeAttr, err := lu.metadataBackend.Get(path, prefixes.TypeAttr)
t := provider.ResourceType_RESOURCE_TYPE_INVALID
typeAttr, err := lu.metadataBackend.GetInt64(path, prefixes.TypeAttr)
if err == nil {
typeInt, err := strconv.ParseInt(typeAttr, 10, 32)
if err != nil {
return t
}
return provider.ResourceType(typeInt)
return provider.ResourceType(int32(typeAttr))
}

t := provider.ResourceType_RESOURCE_TYPE_INVALID
// Fall back to checking on disk
fi, err := os.Lstat(path)
if err != nil {
@@ -277,69 +267,53 @@ func refFromCS3(b []byte) (*provider.Reference, error) {
// CopyMetadata copies all extended attributes from source to target.
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
// For the source file, a shared lock is acquired.
// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally
// NOTE: target resource will be write locked!
func (lu *Lookup) CopyMetadata(src, target string, filter func(attributeName string) bool) (err error) {
var readLock *flock.Flock

// Acquire a read log on the source node
readLock, err = filelocks.AcquireReadLock(src)
// write lock existing node before reading treesize or tree time
f, err := lockedfile.Open(lu.MetadataBackend().MetadataPath(src))
if err != nil {
return err
}

if err != nil {
return errors.Wrap(err, "xattrs: Unable to lock source to read")
}
defer func() {
rerr := filelocks.ReleaseLock(readLock)
rerr := f.Close()

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

return lu.CopyMetadataWithSourceLock(src, target, filter, readLock)
return lu.CopyMetadataWithSourceLock(src, target, filter, f)
}

// CopyMetadataWithSourceLock copies all extended attributes from source to target.
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
// For the source file, a shared lock is acquired.
// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally
func (lu *Lookup) CopyMetadataWithSourceLock(src, target string, filter func(attributeName string) bool, readLock *flock.Flock) (err error) {
// For the source file, a matching lockedfile is required.
// NOTE: target resource will be write locked!
func (lu *Lookup) CopyMetadataWithSourceLock(source, target string, filter func(attributeName string) bool, readLock *lockedfile.File) (err error) {
switch {
case readLock == nil:
return errors.New("no lock provided")
case readLock.Path() != filelocks.FlockFile(src):
case readLock.File.Name() != lu.MetadataBackend().MetadataPath(source):
return errors.New("lockpath does not match filepath")
case !readLock.Locked() && !readLock.RLocked(): // we need either a read or a write lock
return errors.New("not locked")
}

// both locks are established. Copy.
var attrNameList []string
if attrNameList, err = lu.metadataBackend.List(src); err != nil {
return errors.Wrap(err, "Can not get xattr listing on src")
attrs, err := lu.metadataBackend.All(source)
if err != nil {
return err
}

// error handling: We count errors of reads or writes of xattrs.
// if there were any read or write errors an error is returned.
var (
xerrs = 0
xerr error
)
for idx := range attrNameList {
attrName := attrNameList[idx]
newAttrs := make(map[string][]byte, 0)
for attrName, val := range attrs {
if filter == nil || filter(attrName) {
var attrVal string
if attrVal, xerr = lu.metadataBackend.Get(src, attrName); xerr != nil {
xerrs++
}
if xerr = lu.metadataBackend.Set(target, attrName, attrVal); xerr != nil {
xerrs++
}
newAttrs[attrName] = val
}
}
if xerrs > 0 {
err = errors.Wrap(xerr, "failed to copy all xattrs, last error returned")
}

return err
return lu.MetadataBackend().SetMultiple(target, newAttrs, true)
}
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/metadata.go
Original file line number Diff line number Diff line change
@@ -111,7 +111,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
}
for k, v := range md.Metadata {
attrName := prefixes.MetadataPrefix + k
if err = n.SetXattr(attrName, v); err != nil {
if err = n.SetXattrString(attrName, v); err != nil {
errs = append(errs, errors.Wrap(err, "Decomposedfs: could not set metadata attribute "+attrName+" to "+k))
}
}
Loading