Skip to content

Commit

Permalink
Use storage-relative paths as the cache key
Browse files Browse the repository at this point in the history
That prevents non-aligned cache keys when multiple providers mount the
storage at a different location.
  • Loading branch information
aduffeck committed Mar 13, 2023
1 parent ca06d06 commit 719e536
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 160 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-ini-backend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Fix ini backend

We fixed a few issues in the ini backend regarding encoding and cache invalidation.

https://github.com/cs3org/reva/pull/3711
1 change: 1 addition & 0 deletions pkg/storage/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type CreatePersonalSpaceCache interface {
// FileMetadataCache handles file metadata
type FileMetadataCache interface {
Cache
RemoveMetadata(path string) error
}

// GetStatCache will return an existing StatCache for the given store, nodes, database and table
Expand Down
16 changes: 16 additions & 0 deletions pkg/storage/cache/filemetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cache

import (
"strings"
"time"
)

Expand All @@ -36,3 +37,18 @@ func NewFileMetadataCache(store string, nodes []string, database, table string,

return c
}

// RemoveMetadata removes a reference from the metadata cache
func (c *fileMetadataCache) RemoveMetadata(path string) error {
keys, err := c.List()
if err != nil {
return err
}

for _, key := range keys {
if strings.HasPrefix(key, path) {
_ = c.Delete(key)
}
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (
case "xattrs":
lu = lookup.New(metadata.XattrsBackend{}, o)
case "ini":
lu = lookup.New(metadata.NewIniBackend(o.FileMetadataCache), o)
lu = lookup.New(metadata.NewIniBackend(o.Root, o.FileMetadataCache), o)
default:
return nil, fmt.Errorf("unknown metadata backend %s, only 'ini' or 'xattrs' (default) supported", o.MetadataBackend)
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/utils/decomposedfs/decomposedfs_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ var _ = Describe("Decomposed", func() {
woFile.Close()
Eventually(func() bool { s, _ := states.Load("managedToOpenroFile"); return s.(bool) }).Should(BeTrue())
})

It("allows opening rw while an exclusive lock is being held", func() {
states := sync.Map{}

path, err := os.CreateTemp("", "decomposedfs-lockedfile-test-")
Expect(err).ToNot(HaveOccurred())
states.Store("managedToOpenrwLockedFile", false)
woFile, err := lockedfile.OpenFile(path.Name(), os.O_WRONLY, 0)
Expect(err).ToNot(HaveOccurred())
defer woFile.Close()
go func() {
roFile, err := os.OpenFile(path.Name(), os.O_RDWR, 0)
Expect(err).ToNot(HaveOccurred())
defer roFile.Close()
states.Store("managedToOpenrwLockedFile", true)
}()

woFile.Close()
Eventually(func() bool { s, _ := states.Load("managedToOpenrwLockedFile"); return s.(bool) }).Should(BeTrue())
})
})

})
Expand Down
57 changes: 20 additions & 37 deletions pkg/storage/utils/decomposedfs/lookup/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/gofrs/flock"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
)

// Lookup implements transformations from filepath to node and back
Expand Down Expand Up @@ -277,69 +276,53 @@ func refFromCS3(b []byte) (*provider.Reference, error) {
// CopyMetadata copies all extended attributes from source to target.
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
// For the source file, a shared lock is acquired.
// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally
// NOTE: target resource will be write locked!
func (lu *Lookup) CopyMetadata(src, target string, filter func(attributeName string) bool) (err error) {
var readLock *flock.Flock

// Acquire a read log on the source node
readLock, err = filelocks.AcquireReadLock(src)
// write lock existing node before reading treesize or tree time
f, err := lockedfile.Open(lu.MetadataBackend().MetadataPath(src))
if err != nil {
return err
}

if err != nil {
return errors.Wrap(err, "xattrs: Unable to lock source to read")
}
defer func() {
rerr := filelocks.ReleaseLock(readLock)
rerr := f.Close()

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

return lu.CopyMetadataWithSourceLock(src, target, filter, readLock)
return lu.CopyMetadataWithSourceLock(src, target, filter, f)
}

// CopyMetadataWithSourceLock copies all extended attributes from source to target.
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
// For the source file, a shared lock is acquired.
// NOTE: target resource is not locked! You need to acquire a write lock on the target additionally
func (lu *Lookup) CopyMetadataWithSourceLock(src, target string, filter func(attributeName string) bool, readLock *flock.Flock) (err error) {
// For the source file, a matching lockedfile is required.
// NOTE: target resource will be write locked!
func (lu *Lookup) CopyMetadataWithSourceLock(source, target string, filter func(attributeName string) bool, readLock *lockedfile.File) (err error) {
switch {
case readLock == nil:
return errors.New("no lock provided")
case readLock.Path() != filelocks.FlockFile(src):
case readLock.File.Name() != lu.MetadataBackend().MetadataPath(source):
return errors.New("lockpath does not match filepath")
case !readLock.Locked() && !readLock.RLocked(): // we need either a read or a write lock
return errors.New("not locked")
}

// both locks are established. Copy.
var attrNameList []string
if attrNameList, err = lu.metadataBackend.List(src); err != nil {
return errors.Wrap(err, "Can not get xattr listing on src")
attrs, err := lu.metadataBackend.All(source)
if err != nil {
return err
}

// error handling: We count errors of reads or writes of xattrs.
// if there were any read or write errors an error is returned.
var (
xerrs = 0
xerr error
)
for idx := range attrNameList {
attrName := attrNameList[idx]
newAttrs := make(map[string]string, 0)
for attrName, val := range attrs {
if filter == nil || filter(attrName) {
var attrVal string
if attrVal, xerr = lu.metadataBackend.Get(src, attrName); xerr != nil {
xerrs++
}
if xerr = lu.metadataBackend.Set(target, attrName, attrVal); xerr != nil {
xerrs++
}
newAttrs[attrName] = val
}
}
if xerrs > 0 {
err = errors.Wrap(xerr, "failed to copy all xattrs, last error returned")
}

return err
return lu.MetadataBackend().SetMultiple(target, newAttrs, true)
}
Loading

0 comments on commit 719e536

Please sign in to comment.