Skip to content

Commit

Permalink
Fix writing 0 byte msgpack metadata (#4033)
Browse files Browse the repository at this point in the history
* Prevent 0-byte metadata files

* ignore reading not existing file

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* actually, we are the ones creating empty files on purpose :(

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* Lock the proper file

* Do not tolerate existing 0-byte files. We do not generate them anymore

* Only tolerate ErrNotExist errors

* Fix rebase error

* Handle error

* Add changelog

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* Adapt IsMetaFile to new locking strategy

* always touch metadata file

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* Fix unit tests

* Do not try to read metadata from the lock file

* Filter .lock files when listing revisions

* Prevent lockfile name clashes

---------

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
Co-authored-by: André Duffeck <[email protected]>
  • Loading branch information
butonic and aduffeck authored Jul 4, 2023
1 parent ea9b5ba commit 5e7f597
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 87 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-0-byte-msgpack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: fix writing 0 byte msgpack metadata

File metadata is now written atomically to be more resilient during timeouts

https://github.com/cs3org/reva/pull/4033
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/google/renameio/v2 v2.0.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.13.0 // indirect
github.com/hashicorp/consul/api v1.15.2 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,8 @@ github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/renameio/v2 v2.0.0 h1:UifI23ZTGY8Tt29JbYFiuyIU3eX+RNFtUwefq9qAhxg=
github.com/google/renameio/v2 v2.0.0/go.mod h1:BtmJXm5YlszgC+TD4HOEEUFgkJP3nLxehU6hfe7jRt4=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/utils/decomposedfs/lookup/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func refFromCS3(b []byte) (*provider.Reference, error) {
func (lu *Lookup) CopyMetadata(ctx context.Context, src, target string, filter func(attributeName string) bool) (err error) {
// Acquire a read log on the source node
// write lock existing node before reading treesize or tree time
f, err := lockedfile.Open(lu.MetadataBackend().MetadataPath(src))
lock, err := lockedfile.OpenFile(lu.MetadataBackend().LockfilePath(src), os.O_RDONLY|os.O_CREATE, 0600)
if err != nil {
return err
}
Expand All @@ -293,15 +293,15 @@ func (lu *Lookup) CopyMetadata(ctx context.Context, src, target string, filter f
return errors.Wrap(err, "xattrs: Unable to lock source to read")
}
defer func() {
rerr := f.Close()
rerr := lock.Close()

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

return lu.CopyMetadataWithSourceLock(ctx, src, target, filter, f)
return lu.CopyMetadataWithSourceLock(ctx, src, target, filter, lock)
}

// CopyMetadataWithSourceLock copies all extended attributes from source to target.
Expand All @@ -312,11 +312,11 @@ func (lu *Lookup) CopyMetadataWithSourceLock(ctx context.Context, sourcePath, ta
switch {
case lockedSource == nil:
return errors.New("no lock provided")
case lockedSource.File.Name() != lu.MetadataBackend().MetadataPath(sourcePath):
case lockedSource.File.Name() != lu.MetadataBackend().LockfilePath(sourcePath):
return errors.New("lockpath does not match filepath")
}

attrs, err := lu.metadataBackend.AllWithLockedSource(ctx, sourcePath, lockedSource)
attrs, err := lu.metadataBackend.All(ctx, sourcePath)
if err != nil {
return err
}
Expand Down
83 changes: 45 additions & 38 deletions pkg/storage/utils/decomposedfs/metadata/messagepack_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ package metadata

import (
"context"
"errors"
"io"
"io/fs"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/cs3org/reva/v2/pkg/storage/cache"
"github.com/google/renameio/v2"
"github.com/pkg/xattr"
"github.com/rogpeppe/go-internal/lockedfile"
"github.com/shamaton/msgpack/v2"
Expand Down Expand Up @@ -142,74 +145,65 @@ func (b MessagePackBackend) saveAttributes(ctx context.Context, path string, set
span.End()
}()

lockPath := b.LockfilePath(path)
metaPath := b.MetadataPath(path)
if acquireLock {
_, subspan := tracer.Start(ctx, "lockedfile.OpenFile")
f, err = lockedfile.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0600)
subspan.End()
} else {
_, subspan := tracer.Start(ctx, "os.OpenFile")
f, err = os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0600)
f, err = lockedfile.OpenFile(lockPath, os.O_RDWR|os.O_CREATE, 0600)
subspan.End()
defer f.Close()
}
if err != nil {
return err
}
defer f.Close()

// Invalidate cache early
_, subspan := tracer.Start(ctx, "metaCache.RemoveMetadata")
_ = b.metaCache.RemoveMetadata(b.cacheKey(path))
subspan.End()

// Read current state
_, subspan = tracer.Start(ctx, "io.ReadAll")
_, subspan = tracer.Start(ctx, "os.ReadFile")
var msgBytes []byte
msgBytes, err = io.ReadAll(f)
msgBytes, err = os.ReadFile(metaPath)
subspan.End()
if err != nil {
return err
}
attribs := map[string][]byte{}
if len(msgBytes) > 0 {
switch {
case err != nil:
if !errors.Is(err, fs.ErrNotExist) {
return err
}
case len(msgBytes) == 0:
// ugh. an empty file? bail out
return errors.New("encountered empty metadata file")
default:
// only unmarshal if we read data
err = msgpack.Unmarshal(msgBytes, &attribs)
if err != nil {
return err
}
}

// set new metadata
// prepare metadata
for key, val := range setAttribs {
attribs[key] = val
}
for _, key := range deleteAttribs {
delete(attribs, key)
}

// Truncate file
_, err = f.Seek(0, io.SeekStart)
if err != nil {
return err
}
_, subspan = tracer.Start(ctx, "f.Truncate")
err = f.Truncate(0)
subspan.End()
if err != nil {
return err
}

// Write new metadata to file
var d []byte
d, err = msgpack.Marshal(attribs)
if err != nil {
return err
}
_, subspan = tracer.Start(ctx, "f.Write")
_, err = f.Write(d)
subspan.End()

// overwrite file atomically
_, subspan = tracer.Start(ctx, "renameio.Writefile")
err = renameio.WriteFile(metaPath, d, 0600)
if err != nil {
return err
}
subspan.End()

_, subspan = tracer.Start(ctx, "metaCache.PushToCache")
err = b.metaCache.PushToCache(b.cacheKey(path), attribs)
Expand All @@ -227,9 +221,13 @@ func (b MessagePackBackend) loadAttributes(ctx context.Context, path string, sou
}

metaPath := b.MetadataPath(path)
var msgBytes []byte

if source == nil {
_, subspan := tracer.Start(ctx, "lockedfile.Open")
source, err = lockedfile.Open(metaPath)
// // No cached entry found. Read from storage and store in cache
_, subspan := tracer.Start(ctx, "os.OpenFile")
// source, err = lockedfile.Open(metaPath)
source, err = os.Open(metaPath)
subspan.End()
// // No cached entry found. Read from storage and store in cache
if err != nil {
Expand All @@ -246,12 +244,16 @@ func (b MessagePackBackend) loadAttributes(ctx context.Context, path string, sou
return attribs, nil // no attributes set yet
}
}
defer source.(*lockedfile.File).Close()
_, subspan = tracer.Start(ctx, "io.ReadAll")
msgBytes, err = io.ReadAll(source)
source.(*os.File).Close()
subspan.End()
} else {
_, subspan := tracer.Start(ctx, "io.ReadAll")
msgBytes, err = io.ReadAll(source)
subspan.End()
}

_, subspan := tracer.Start(ctx, "io.ReadAll")
msgBytes, err := io.ReadAll(source)
subspan.End()
if err != nil {
return nil, err
}
Expand All @@ -262,7 +264,7 @@ func (b MessagePackBackend) loadAttributes(ctx context.Context, path string, sou
}
}

_, subspan = tracer.Start(ctx, "metaCache.PushToCache")
_, subspan := tracer.Start(ctx, "metaCache.PushToCache")
err = b.metaCache.PushToCache(b.cacheKey(path), attribs)
subspan.End()
if err != nil {
Expand All @@ -273,7 +275,9 @@ func (b MessagePackBackend) loadAttributes(ctx context.Context, path string, sou
}

// IsMetaFile returns whether the given path represents a meta file
func (MessagePackBackend) IsMetaFile(path string) bool { return strings.HasSuffix(path, ".mpk") }
func (MessagePackBackend) IsMetaFile(path string) bool {
return strings.HasSuffix(path, ".mpk") || strings.HasSuffix(path, ".mpk.lock")
}

// Purge purges the data of a given path
func (b MessagePackBackend) Purge(path string) error {
Expand Down Expand Up @@ -304,6 +308,9 @@ func (b MessagePackBackend) Rename(oldPath, newPath string) error {
// MetadataPath returns the path of the file holding the metadata for the given path
func (MessagePackBackend) MetadataPath(path string) string { return path + ".mpk" }

// LockfilePath returns the path of the lock file
func (MessagePackBackend) LockfilePath(path string) string { return path + ".mpk.lock" }

func (b MessagePackBackend) cacheKey(path string) string {
// rootPath is guaranteed to have no trailing slash
// the cache key shouldn't begin with a slash as some stores drop it which can cause
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/utils/decomposedfs/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Backend interface {
Rename(oldPath, newPath string) error
IsMetaFile(path string) bool
MetadataPath(path string) string
LockfilePath(path string) string

AllWithLockedSource(ctx context.Context, path string, source io.Reader) (map[string][]byte, error)
}
Expand Down Expand Up @@ -110,6 +111,9 @@ func (NullBackend) Rename(oldPath, newPath string) error { return errUnconfigure
// MetadataPath returns the path of the file holding the metadata for the given path
func (NullBackend) MetadataPath(path string) string { return "" }

// LockfilePath returns the path of the lock file
func (NullBackend) LockfilePath(path string) string { return "" }

// AllWithLockedSource reads all extended attributes from the given reader
// The path argument is used for storing the data in the cache
func (NullBackend) AllWithLockedSource(ctx context.Context, path string, source io.Reader) (map[string][]byte, error) {
Expand Down
22 changes: 8 additions & 14 deletions pkg/storage/utils/decomposedfs/metadata/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ import (

var _ = Describe("Backend", func() {
var (
tmpdir string
file string
metafile string
tmpdir string
file string

backend metadata.Backend
)
Expand All @@ -46,9 +45,6 @@ var _ = Describe("Backend", func() {

JustBeforeEach(func() {
file = path.Join(tmpdir, "file")
metafile = backend.MetadataPath(file)
_, err := os.Create(metafile)
Expect(err).ToNot(HaveOccurred())
})

AfterEach(func() {
Expand Down Expand Up @@ -147,10 +143,9 @@ var _ = Describe("Backend", func() {
Expect(v["bar"]).To(Equal([]byte("baz")))
})

It("returns an empty map", func() {
v, err := backend.All(context.Background(), file)
Expect(err).ToNot(HaveOccurred())
Expect(v).To(Equal(map[string][]byte{}))
It("fails when the metafile does not exist", func() {
_, err := backend.All(context.Background(), file)
Expect(err).To(HaveOccurred())
})
})

Expand All @@ -165,10 +160,9 @@ var _ = Describe("Backend", func() {
Expect(v).To(ConsistOf("foo", "bar"))
})

It("returns an empty list", func() {
v, err := backend.List(context.Background(), file)
Expect(err).ToNot(HaveOccurred())
Expect(v).To(Equal([]string{}))
It("fails when the metafile does not exist", func() {
_, err := backend.List(context.Background(), file)
Expect(err).To(HaveOccurred())
})
})

Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/utils/decomposedfs/metadata/xattrs_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"

"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/pkg/errors"
Expand Down Expand Up @@ -156,7 +157,7 @@ func (XattrsBackend) Remove(ctx context.Context, filePath string, key string) (e
}

// IsMetaFile returns whether the given path represents a meta file
func (XattrsBackend) IsMetaFile(path string) bool { return false }
func (XattrsBackend) IsMetaFile(path string) bool { return strings.HasSuffix(path, ".meta.lock") }

// Purge purges the data of a given path
func (XattrsBackend) Purge(path string) error { return nil }
Expand All @@ -167,6 +168,9 @@ func (XattrsBackend) Rename(oldPath, newPath string) error { return nil }
// MetadataPath returns the path of the file holding the metadata for the given path
func (XattrsBackend) MetadataPath(path string) string { return path }

// LockfilePath returns the path of the lock file
func (XattrsBackend) LockfilePath(path string) string { return path + ".meta.lock" }

func cleanupLockfile(f *lockedfile.File) {
_ = f.Close()
_ = os.Remove(f.Name())
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/utils/decomposedfs/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen
np := n.InternalPath()
if items, err := filepath.Glob(np + node.RevisionIDDelimiter + "*"); err == nil {
for i := range items {
if fs.lu.MetadataBackend().IsMetaFile(items[i]) {
if fs.lu.MetadataBackend().IsMetaFile(items[i]) || strings.HasSuffix(items[i], ".lock") {
continue
}

Expand Down Expand Up @@ -237,7 +237,7 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
attributeName == prefixes.BlobsizeAttr
})
if err != nil {
return errtypes.InternalError("failed to copy blob xattrs to version node")
return errtypes.InternalError("failed to copy blob xattrs to version node: " + err.Error())
}

// remember mtime from node as new revision mtime
Expand All @@ -256,7 +256,7 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
attributeName == prefixes.BlobsizeAttr
})
if err != nil {
return errtypes.InternalError("failed to copy blob xattrs to old revision to node")
return errtypes.InternalError("failed to copy blob xattrs to old revision to node: " + err.Error())
}

revisionSize, err := fs.lu.MetadataBackend().GetInt64(ctx, restoredRevisionPath, prefixes.BlobsizeAttr)
Expand Down
14 changes: 2 additions & 12 deletions pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -750,17 +749,8 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err
// lock parent before reading treesize or tree time

_, subspan := tracer.Start(ctx, "lockedfile.OpenFile")
var parentFilename string
switch t.lookup.MetadataBackend().(type) {
case metadata.MessagePackBackend:
parentFilename = t.lookup.MetadataBackend().MetadataPath(n.ParentPath())
f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600)
case metadata.XattrsBackend:
// we have to use dedicated lockfiles to lock directories
// this only works because the xattr backend also locks folders with separate lock files
parentFilename = n.ParentPath() + filelocks.LockFileSuffix
f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600)
}
parentFilename := t.lookup.MetadataBackend().LockfilePath(n.ParentPath())
f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600)
subspan.End()
if err != nil {
sublog.Error().Err(err).
Expand Down
Loading

0 comments on commit 5e7f597

Please sign in to comment.