Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decomposed FS precondition failed locking #2709

Merged
merged 3 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/unreleased/set-lock-precondition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Bugfix: Decomposed FS: return precondition failed if already locked

We've fixed the return code from permission denied to precondition failed if a
user tries to lock an already locked file.

https://github.com/cs3org/reva/pull/2709
2 changes: 1 addition & 1 deletion internal/http/services/owncloud/ocdav/locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func (s *svc) lockReference(ctx context.Context, w http.ResponseWriter, r *http.
// this actually is a name based lock ... ugh
token, err = s.LockSystem.Create(ctx, now, ld)
if err != nil {
if _, ok := err.(errtypes.Locked); ok {
if _, ok := err.(errtypes.PreconditionFailed); ok {
return http.StatusLocked, err
}
return http.StatusInternalServerError, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func (fs *Decomposedfs) GetLock(ctx context.Context, ref *provider.Reference) (*
case !ok:
return nil, errtypes.PermissionDenied(filepath.Join(node.ParentID, node.Name))
}
return node.ReadLock(ctx)
return node.ReadLock(ctx, false)
}

// SetLock puts a lock on the given reference
Expand Down
59 changes: 32 additions & 27 deletions pkg/storage/utils/decomposedfs/node/locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,14 @@ import (
// SetLock sets a lock on the node
func (n *Node) SetLock(ctx context.Context, lock *provider.Lock) error {
lockFilePath := n.LockFilePath()
// check existing lock

if l, _ := n.ReadLock(ctx); l != nil {
lockID, _ := ctxpkg.ContextGetLockID(ctx)
if l.LockId != lockID {
return errtypes.Locked(l.LockId)
}

err := os.Remove(lockFilePath)
if err != nil {
return err
}
}

// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(lockFilePath), 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: error creating parent folder for lock")
}
fileLock, err := filelocks.AcquireWriteLock(n.InternalPath())

// get file lock, so that nobody can create the lock in the meantime
fileLock, err := filelocks.AcquireWriteLock(n.InternalPath())
if err != nil {
return err
}
Expand All @@ -72,6 +60,19 @@ func (n *Node) SetLock(ctx context.Context, lock *provider.Lock) error {
}
}()

// check if already locked
l, err := n.ReadLock(ctx, true) // we already have a write file lock, so ReadLock() would fail to acquire a read file lock -> skip it
switch err.(type) {
case errtypes.NotFound:
// file not locked, continue
case nil:
if l != nil {
return errtypes.PreconditionFailed("already locked")
}
default:
return errors.Wrap(err, "Decomposedfs: could check if file already is locked")
}

// O_EXCL to make open fail when the file already exists
f, err := os.OpenFile(lockFilePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
Expand All @@ -87,26 +88,30 @@ func (n *Node) SetLock(ctx context.Context, lock *provider.Lock) error {
}

// ReadLock reads the lock id for a node
func (n Node) ReadLock(ctx context.Context) (*provider.Lock, error) {
func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock, error) {

// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error creating parent folder for lock")
}
fileLock, err := filelocks.AcquireReadLock(n.InternalPath())

if err != nil {
return nil, err
}

defer func() {
rerr := filelocks.ReleaseLock(fileLock)
// the caller of ReadLock already may hold a file lock
if !skipFileLock {
fileLock, err := filelocks.AcquireReadLock(n.InternalPath())

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
if err != nil {
return nil, err
}
}()

defer func() {
rerr := filelocks.ReleaseLock(fileLock)

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()
}

f, err := os.Open(n.LockFilePath())
if err != nil {
Expand Down Expand Up @@ -241,7 +246,7 @@ func (n *Node) Unlock(ctx context.Context, lock *provider.Lock) error {
// CheckLock compares the context lock with the node lock
func (n *Node) CheckLock(ctx context.Context) error {
lockID, _ := ctxpkg.ContextGetLockID(ctx)
lock, _ := n.ReadLock(ctx)
lock, _ := n.ReadLock(ctx, false)
if lock != nil {
switch lockID {
case "":
Expand Down
42 changes: 11 additions & 31 deletions pkg/storage/utils/decomposedfs/node/locks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
helpers "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/testhelpers"
)
Expand Down Expand Up @@ -102,25 +103,14 @@ var _ = Describe("Node locks", func() {
Expect(err).ToNot(HaveOccurred())
})

It("refuses to lock if already locked an existing lock was not provided", func() {
It("refuses to set a lock if already locked", func() {
err := n.SetLock(env.Ctx, lockByUser)
Expect(err).ToNot(HaveOccurred())

err = n.SetLock(env.Ctx, lockByUser)
Expect(err).To(HaveOccurred())

env.Ctx = ctxpkg.ContextSetLockID(env.Ctx, wrongLockByUser.LockId)
err = n.SetLock(env.Ctx, lockByUser)
Expect(err).To(HaveOccurred())
})

It("relocks if the existing lock was provided", func() {
err := n.SetLock(env.Ctx, lockByUser)
Expect(err).ToNot(HaveOccurred())

env.Ctx = ctxpkg.ContextSetLockID(env.Ctx, lockByUser.LockId)
err = n.SetLock(env.Ctx, lockByUser)
Expect(err).ToNot(HaveOccurred())
_, ok := err.(errtypes.PreconditionFailed)
Expect(ok).To(BeTrue())
})
})

Expand All @@ -136,26 +126,16 @@ var _ = Describe("Node locks", func() {
Expect(err).ToNot(HaveOccurred())
})

It("refuses to lock if already locked an existing lock was not provided", func() {
It("refuses to set a lock if already locked", func() {
err := n.SetLock(env.Ctx, lockByApp)
Expect(err).ToNot(HaveOccurred())

err = n.SetLock(env.Ctx, lockByApp)
Expect(err).To(HaveOccurred())

env.Ctx = ctxpkg.ContextSetLockID(env.Ctx, wrongLockByApp.LockId)
err = n.SetLock(env.Ctx, lockByApp)
Expect(err).To(HaveOccurred())
_, ok := err.(errtypes.PreconditionFailed)
Expect(ok).To(BeTrue())
})

It("relocks if the existing lock was provided", func() {
err := n.SetLock(env.Ctx, lockByApp)
Expect(err).ToNot(HaveOccurred())

env.Ctx = ctxpkg.ContextSetLockID(env.Ctx, lockByApp.LockId)
err = n.SetLock(env.Ctx, lockByApp)
Expect(err).ToNot(HaveOccurred())
})
})

Context("with an existing lock for a user", func() {
Expand All @@ -166,13 +146,13 @@ var _ = Describe("Node locks", func() {

Describe("ReadLock", func() {
It("returns the lock", func() {
l, err := n.ReadLock(env.Ctx)
l, err := n.ReadLock(env.Ctx, false)
Expect(err).ToNot(HaveOccurred())
Expect(l).To(Equal(lockByUser))
})

It("reports an error when the node wasn't locked", func() {
_, err := n2.ReadLock(env.Ctx)
_, err := n2.ReadLock(env.Ctx, false)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("no lock found"))
})
Expand Down Expand Up @@ -270,13 +250,13 @@ var _ = Describe("Node locks", func() {

Describe("ReadLock", func() {
It("returns the lock", func() {
l, err := n.ReadLock(env.Ctx)
l, err := n.ReadLock(env.Ctx, false)
Expect(err).ToNot(HaveOccurred())
Expect(l).To(Equal(lockByApp))
})

It("reports an error when the node wasn't locked", func() {
_, err := n2.ReadLock(env.Ctx)
_, err := n2.ReadLock(env.Ctx, false)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("no lock found"))
})
Expand Down