Skip to content

Commit

Permalink
Decomposed FS precondition failed locking (#2709)
Browse files Browse the repository at this point in the history
* return precondition failed if already locked (decomposed fs)

* ensure right error code when already locked

* use precondition failed in webdav too
  • Loading branch information
wkloucek authored Apr 1, 2022
1 parent c025b2a commit 8ffc980
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 60 deletions.
6 changes: 6 additions & 0 deletions changelog/unreleased/set-lock-precondition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Bugfix: Decomposed FS: return precondition failed if already locked

We've fixed the return code from permission denied to precondition failed if a
user tries to lock an already locked file.

https://github.com/cs3org/reva/pull/2709
2 changes: 1 addition & 1 deletion internal/http/services/owncloud/ocdav/locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func (s *svc) lockReference(ctx context.Context, w http.ResponseWriter, r *http.
// this actually is a name based lock ... ugh
token, err = s.LockSystem.Create(ctx, now, ld)
if err != nil {
if _, ok := err.(errtypes.Locked); ok {
if _, ok := err.(errtypes.PreconditionFailed); ok {
return http.StatusLocked, err
}
return http.StatusInternalServerError, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func (fs *Decomposedfs) GetLock(ctx context.Context, ref *provider.Reference) (*
case !ok:
return nil, errtypes.PermissionDenied(filepath.Join(node.ParentID, node.Name))
}
return node.ReadLock(ctx)
return node.ReadLock(ctx, false)
}

// SetLock puts a lock on the given reference
Expand Down
59 changes: 32 additions & 27 deletions pkg/storage/utils/decomposedfs/node/locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,14 @@ import (
// SetLock sets a lock on the node
func (n *Node) SetLock(ctx context.Context, lock *provider.Lock) error {
lockFilePath := n.LockFilePath()
// check existing lock

if l, _ := n.ReadLock(ctx); l != nil {
lockID, _ := ctxpkg.ContextGetLockID(ctx)
if l.LockId != lockID {
return errtypes.Locked(l.LockId)
}

err := os.Remove(lockFilePath)
if err != nil {
return err
}
}

// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(lockFilePath), 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: error creating parent folder for lock")
}
fileLock, err := filelocks.AcquireWriteLock(n.InternalPath())

// get file lock, so that nobody can create the lock in the meantime
fileLock, err := filelocks.AcquireWriteLock(n.InternalPath())
if err != nil {
return err
}
Expand All @@ -72,6 +60,19 @@ func (n *Node) SetLock(ctx context.Context, lock *provider.Lock) error {
}
}()

// check if already locked
l, err := n.ReadLock(ctx, true) // we already have a write file lock, so ReadLock() would fail to acquire a read file lock -> skip it
switch err.(type) {
case errtypes.NotFound:
// file not locked, continue
case nil:
if l != nil {
return errtypes.PreconditionFailed("already locked")
}
default:
return errors.Wrap(err, "Decomposedfs: could check if file already is locked")
}

// O_EXCL to make open fail when the file already exists
f, err := os.OpenFile(lockFilePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
Expand All @@ -87,26 +88,30 @@ func (n *Node) SetLock(ctx context.Context, lock *provider.Lock) error {
}

// ReadLock reads the lock id for a node
func (n Node) ReadLock(ctx context.Context) (*provider.Lock, error) {
func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock, error) {

// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error creating parent folder for lock")
}
fileLock, err := filelocks.AcquireReadLock(n.InternalPath())

if err != nil {
return nil, err
}

defer func() {
rerr := filelocks.ReleaseLock(fileLock)
// the caller of ReadLock already may hold a file lock
if !skipFileLock {
fileLock, err := filelocks.AcquireReadLock(n.InternalPath())

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
if err != nil {
return nil, err
}
}()

defer func() {
rerr := filelocks.ReleaseLock(fileLock)

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()
}

f, err := os.Open(n.LockFilePath())
if err != nil {
Expand Down Expand Up @@ -241,7 +246,7 @@ func (n *Node) Unlock(ctx context.Context, lock *provider.Lock) error {
// CheckLock compares the context lock with the node lock
func (n *Node) CheckLock(ctx context.Context) error {
lockID, _ := ctxpkg.ContextGetLockID(ctx)
lock, _ := n.ReadLock(ctx)
lock, _ := n.ReadLock(ctx, false)
if lock != nil {
switch lockID {
case "":
Expand Down
42 changes: 11 additions & 31 deletions pkg/storage/utils/decomposedfs/node/locks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
helpers "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/testhelpers"
)
Expand Down Expand Up @@ -102,25 +103,14 @@ var _ = Describe("Node locks", func() {
Expect(err).ToNot(HaveOccurred())
})

It("refuses to lock if already locked an existing lock was not provided", func() {
It("refuses to set a lock if already locked", func() {
err := n.SetLock(env.Ctx, lockByUser)
Expect(err).ToNot(HaveOccurred())

err = n.SetLock(env.Ctx, lockByUser)
Expect(err).To(HaveOccurred())

env.Ctx = ctxpkg.ContextSetLockID(env.Ctx, wrongLockByUser.LockId)
err = n.SetLock(env.Ctx, lockByUser)
Expect(err).To(HaveOccurred())
})

It("relocks if the existing lock was provided", func() {
err := n.SetLock(env.Ctx, lockByUser)
Expect(err).ToNot(HaveOccurred())

env.Ctx = ctxpkg.ContextSetLockID(env.Ctx, lockByUser.LockId)
err = n.SetLock(env.Ctx, lockByUser)
Expect(err).ToNot(HaveOccurred())
_, ok := err.(errtypes.PreconditionFailed)
Expect(ok).To(BeTrue())
})
})

Expand All @@ -136,26 +126,16 @@ var _ = Describe("Node locks", func() {
Expect(err).ToNot(HaveOccurred())
})

It("refuses to lock if already locked an existing lock was not provided", func() {
It("refuses to set a lock if already locked", func() {
err := n.SetLock(env.Ctx, lockByApp)
Expect(err).ToNot(HaveOccurred())

err = n.SetLock(env.Ctx, lockByApp)
Expect(err).To(HaveOccurred())

env.Ctx = ctxpkg.ContextSetLockID(env.Ctx, wrongLockByApp.LockId)
err = n.SetLock(env.Ctx, lockByApp)
Expect(err).To(HaveOccurred())
_, ok := err.(errtypes.PreconditionFailed)
Expect(ok).To(BeTrue())
})

It("relocks if the existing lock was provided", func() {
err := n.SetLock(env.Ctx, lockByApp)
Expect(err).ToNot(HaveOccurred())

env.Ctx = ctxpkg.ContextSetLockID(env.Ctx, lockByApp.LockId)
err = n.SetLock(env.Ctx, lockByApp)
Expect(err).ToNot(HaveOccurred())
})
})

Context("with an existing lock for a user", func() {
Expand All @@ -166,13 +146,13 @@ var _ = Describe("Node locks", func() {

Describe("ReadLock", func() {
It("returns the lock", func() {
l, err := n.ReadLock(env.Ctx)
l, err := n.ReadLock(env.Ctx, false)
Expect(err).ToNot(HaveOccurred())
Expect(l).To(Equal(lockByUser))
})

It("reports an error when the node wasn't locked", func() {
_, err := n2.ReadLock(env.Ctx)
_, err := n2.ReadLock(env.Ctx, false)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("no lock found"))
})
Expand Down Expand Up @@ -270,13 +250,13 @@ var _ = Describe("Node locks", func() {

Describe("ReadLock", func() {
It("returns the lock", func() {
l, err := n.ReadLock(env.Ctx)
l, err := n.ReadLock(env.Ctx, false)
Expect(err).ToNot(HaveOccurred())
Expect(l).To(Equal(lockByApp))
})

It("reports an error when the node wasn't locked", func() {
_, err := n2.ReadLock(env.Ctx)
_, err := n2.ReadLock(env.Ctx, false)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("no lock found"))
})
Expand Down

0 comments on commit 8ffc980

Please sign in to comment.