diff --git a/changelog/unreleased/fix-0-byte-msgpack.md b/changelog/unreleased/fix-0-byte-msgpack.md new file mode 100644 index 0000000000..da0cf11125 --- /dev/null +++ b/changelog/unreleased/fix-0-byte-msgpack.md @@ -0,0 +1,5 @@ +Bugfix: fix writing 0 byte msgpack metadata + +File metadata is now written atomically to be more resilient during timeouts + +https://github.com/cs3org/reva/pull/4033 diff --git a/go.mod b/go.mod index b586864a4a..538837166b 100644 --- a/go.mod +++ b/go.mod @@ -138,6 +138,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect + github.com/google/renameio/v2 v2.0.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.13.0 // indirect github.com/hashicorp/consul/api v1.15.2 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect diff --git a/go.sum b/go.sum index ea902352db..76f735197f 100644 --- a/go.sum +++ b/go.sum @@ -781,6 +781,8 @@ github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/renameio/v2 v2.0.0 h1:UifI23ZTGY8Tt29JbYFiuyIU3eX+RNFtUwefq9qAhxg= +github.com/google/renameio/v2 v2.0.0/go.mod h1:BtmJXm5YlszgC+TD4HOEEUFgkJP3nLxehU6hfe7jRt4= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/pkg/storage/utils/decomposedfs/lookup/lookup.go b/pkg/storage/utils/decomposedfs/lookup/lookup.go index 1bf151449e..273e094f9d 100644 --- a/pkg/storage/utils/decomposedfs/lookup/lookup.go +++ b/pkg/storage/utils/decomposedfs/lookup/lookup.go @@ -284,7 +284,7 @@ func refFromCS3(b []byte) (*provider.Reference, error) { func (lu *Lookup) CopyMetadata(ctx context.Context, src, target string, filter func(attributeName string) bool) (err error) { // Acquire a read log on the source node // write lock existing node before reading treesize or tree time - f, err := lockedfile.Open(lu.MetadataBackend().MetadataPath(src)) + lock, err := lockedfile.OpenFile(lu.MetadataBackend().LockfilePath(src), os.O_RDONLY|os.O_CREATE, 0600) if err != nil { return err } @@ -293,7 +293,7 @@ func (lu *Lookup) CopyMetadata(ctx context.Context, src, target string, filter f return errors.Wrap(err, "xattrs: Unable to lock source to read") } defer func() { - rerr := f.Close() + rerr := lock.Close() // if err is non nil we do not overwrite that if err == nil { @@ -301,7 +301,7 @@ func (lu *Lookup) CopyMetadata(ctx context.Context, src, target string, filter f } }() - return lu.CopyMetadataWithSourceLock(ctx, src, target, filter, f) + return lu.CopyMetadataWithSourceLock(ctx, src, target, filter, lock) } // CopyMetadataWithSourceLock copies all extended attributes from source to target. @@ -312,11 +312,11 @@ func (lu *Lookup) CopyMetadataWithSourceLock(ctx context.Context, sourcePath, ta switch { case lockedSource == nil: return errors.New("no lock provided") - case lockedSource.File.Name() != lu.MetadataBackend().MetadataPath(sourcePath): + case lockedSource.File.Name() != lu.MetadataBackend().LockfilePath(sourcePath): return errors.New("lockpath does not match filepath") } - attrs, err := lu.metadataBackend.AllWithLockedSource(ctx, sourcePath, lockedSource) + attrs, err := lu.metadataBackend.All(ctx, sourcePath) if err != nil { return err } diff --git a/pkg/storage/utils/decomposedfs/metadata/messagepack_backend.go b/pkg/storage/utils/decomposedfs/metadata/messagepack_backend.go index 7e2137c2c4..9cfa629c10 100644 --- a/pkg/storage/utils/decomposedfs/metadata/messagepack_backend.go +++ b/pkg/storage/utils/decomposedfs/metadata/messagepack_backend.go @@ -20,7 +20,9 @@ package metadata import ( "context" + "errors" "io" + "io/fs" "os" "path/filepath" "strconv" @@ -28,6 +30,7 @@ import ( "time" "github.com/cs3org/reva/v2/pkg/storage/cache" + "github.com/google/renameio/v2" "github.com/pkg/xattr" "github.com/rogpeppe/go-internal/lockedfile" "github.com/shamaton/msgpack/v2" @@ -142,20 +145,17 @@ func (b MessagePackBackend) saveAttributes(ctx context.Context, path string, set span.End() }() + lockPath := b.LockfilePath(path) metaPath := b.MetadataPath(path) if acquireLock { _, subspan := tracer.Start(ctx, "lockedfile.OpenFile") - f, err = lockedfile.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0600) - subspan.End() - } else { - _, subspan := tracer.Start(ctx, "os.OpenFile") - f, err = os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0600) + f, err = lockedfile.OpenFile(lockPath, os.O_RDWR|os.O_CREATE, 0600) subspan.End() + defer f.Close() } if err != nil { return err } - defer f.Close() // Invalidate cache early _, subspan := tracer.Start(ctx, "metaCache.RemoveMetadata") @@ -163,53 +163,47 @@ func (b MessagePackBackend) saveAttributes(ctx context.Context, path string, set subspan.End() // Read current state - _, subspan = tracer.Start(ctx, "io.ReadAll") + _, subspan = tracer.Start(ctx, "os.ReadFile") var msgBytes []byte - msgBytes, err = io.ReadAll(f) + msgBytes, err = os.ReadFile(metaPath) subspan.End() - if err != nil { - return err - } attribs := map[string][]byte{} - if len(msgBytes) > 0 { + switch { + case err != nil: + if !errors.Is(err, fs.ErrNotExist) { + return err + } + case len(msgBytes) == 0: + // ugh. an empty file? bail out + return errors.New("encountered empty metadata file") + default: + // only unmarshal if we read data err = msgpack.Unmarshal(msgBytes, &attribs) if err != nil { return err } } - // set new metadata + // prepare metadata for key, val := range setAttribs { attribs[key] = val } for _, key := range deleteAttribs { delete(attribs, key) } - - // Truncate file - _, err = f.Seek(0, io.SeekStart) - if err != nil { - return err - } - _, subspan = tracer.Start(ctx, "f.Truncate") - err = f.Truncate(0) - subspan.End() - if err != nil { - return err - } - - // Write new metadata to file var d []byte d, err = msgpack.Marshal(attribs) if err != nil { return err } - _, subspan = tracer.Start(ctx, "f.Write") - _, err = f.Write(d) - subspan.End() + + // overwrite file atomically + _, subspan = tracer.Start(ctx, "renameio.Writefile") + err = renameio.WriteFile(metaPath, d, 0600) if err != nil { return err } + subspan.End() _, subspan = tracer.Start(ctx, "metaCache.PushToCache") err = b.metaCache.PushToCache(b.cacheKey(path), attribs) @@ -227,9 +221,13 @@ func (b MessagePackBackend) loadAttributes(ctx context.Context, path string, sou } metaPath := b.MetadataPath(path) + var msgBytes []byte + if source == nil { - _, subspan := tracer.Start(ctx, "lockedfile.Open") - source, err = lockedfile.Open(metaPath) + // // No cached entry found. Read from storage and store in cache + _, subspan := tracer.Start(ctx, "os.OpenFile") + // source, err = lockedfile.Open(metaPath) + source, err = os.Open(metaPath) subspan.End() // // No cached entry found. Read from storage and store in cache if err != nil { @@ -246,12 +244,16 @@ func (b MessagePackBackend) loadAttributes(ctx context.Context, path string, sou return attribs, nil // no attributes set yet } } - defer source.(*lockedfile.File).Close() + _, subspan = tracer.Start(ctx, "io.ReadAll") + msgBytes, err = io.ReadAll(source) + source.(*os.File).Close() + subspan.End() + } else { + _, subspan := tracer.Start(ctx, "io.ReadAll") + msgBytes, err = io.ReadAll(source) + subspan.End() } - _, subspan := tracer.Start(ctx, "io.ReadAll") - msgBytes, err := io.ReadAll(source) - subspan.End() if err != nil { return nil, err } @@ -262,7 +264,7 @@ func (b MessagePackBackend) loadAttributes(ctx context.Context, path string, sou } } - _, subspan = tracer.Start(ctx, "metaCache.PushToCache") + _, subspan := tracer.Start(ctx, "metaCache.PushToCache") err = b.metaCache.PushToCache(b.cacheKey(path), attribs) subspan.End() if err != nil { @@ -273,7 +275,9 @@ func (b MessagePackBackend) loadAttributes(ctx context.Context, path string, sou } // IsMetaFile returns whether the given path represents a meta file -func (MessagePackBackend) IsMetaFile(path string) bool { return strings.HasSuffix(path, ".mpk") } +func (MessagePackBackend) IsMetaFile(path string) bool { + return strings.HasSuffix(path, ".mpk") || strings.HasSuffix(path, ".mpk.lock") +} // Purge purges the data of a given path func (b MessagePackBackend) Purge(path string) error { @@ -304,6 +308,9 @@ func (b MessagePackBackend) Rename(oldPath, newPath string) error { // MetadataPath returns the path of the file holding the metadata for the given path func (MessagePackBackend) MetadataPath(path string) string { return path + ".mpk" } +// LockfilePath returns the path of the lock file +func (MessagePackBackend) LockfilePath(path string) string { return path + ".mpk.lock" } + func (b MessagePackBackend) cacheKey(path string) string { // rootPath is guaranteed to have no trailing slash // the cache key shouldn't begin with a slash as some stores drop it which can cause diff --git a/pkg/storage/utils/decomposedfs/metadata/metadata.go b/pkg/storage/utils/decomposedfs/metadata/metadata.go index c3e37ea074..243895b364 100644 --- a/pkg/storage/utils/decomposedfs/metadata/metadata.go +++ b/pkg/storage/utils/decomposedfs/metadata/metadata.go @@ -52,6 +52,7 @@ type Backend interface { Rename(oldPath, newPath string) error IsMetaFile(path string) bool MetadataPath(path string) string + LockfilePath(path string) string AllWithLockedSource(ctx context.Context, path string, source io.Reader) (map[string][]byte, error) } @@ -110,6 +111,9 @@ func (NullBackend) Rename(oldPath, newPath string) error { return errUnconfigure // MetadataPath returns the path of the file holding the metadata for the given path func (NullBackend) MetadataPath(path string) string { return "" } +// LockfilePath returns the path of the lock file +func (NullBackend) LockfilePath(path string) string { return "" } + // AllWithLockedSource reads all extended attributes from the given reader // The path argument is used for storing the data in the cache func (NullBackend) AllWithLockedSource(ctx context.Context, path string, source io.Reader) (map[string][]byte, error) { diff --git a/pkg/storage/utils/decomposedfs/metadata/metadata_test.go b/pkg/storage/utils/decomposedfs/metadata/metadata_test.go index eb9741ff4e..4cd0dad629 100644 --- a/pkg/storage/utils/decomposedfs/metadata/metadata_test.go +++ b/pkg/storage/utils/decomposedfs/metadata/metadata_test.go @@ -31,9 +31,8 @@ import ( var _ = Describe("Backend", func() { var ( - tmpdir string - file string - metafile string + tmpdir string + file string backend metadata.Backend ) @@ -46,9 +45,6 @@ var _ = Describe("Backend", func() { JustBeforeEach(func() { file = path.Join(tmpdir, "file") - metafile = backend.MetadataPath(file) - _, err := os.Create(metafile) - Expect(err).ToNot(HaveOccurred()) }) AfterEach(func() { @@ -147,10 +143,9 @@ var _ = Describe("Backend", func() { Expect(v["bar"]).To(Equal([]byte("baz"))) }) - It("returns an empty map", func() { - v, err := backend.All(context.Background(), file) - Expect(err).ToNot(HaveOccurred()) - Expect(v).To(Equal(map[string][]byte{})) + It("fails when the metafile does not exist", func() { + _, err := backend.All(context.Background(), file) + Expect(err).To(HaveOccurred()) }) }) @@ -165,10 +160,9 @@ var _ = Describe("Backend", func() { Expect(v).To(ConsistOf("foo", "bar")) }) - It("returns an empty list", func() { - v, err := backend.List(context.Background(), file) - Expect(err).ToNot(HaveOccurred()) - Expect(v).To(Equal([]string{})) + It("fails when the metafile does not exist", func() { + _, err := backend.List(context.Background(), file) + Expect(err).To(HaveOccurred()) }) }) diff --git a/pkg/storage/utils/decomposedfs/metadata/xattrs_backend.go b/pkg/storage/utils/decomposedfs/metadata/xattrs_backend.go index 5a402f6817..92462f89d2 100644 --- a/pkg/storage/utils/decomposedfs/metadata/xattrs_backend.go +++ b/pkg/storage/utils/decomposedfs/metadata/xattrs_backend.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/pkg/errors" @@ -156,7 +157,7 @@ func (XattrsBackend) Remove(ctx context.Context, filePath string, key string) (e } // IsMetaFile returns whether the given path represents a meta file -func (XattrsBackend) IsMetaFile(path string) bool { return false } +func (XattrsBackend) IsMetaFile(path string) bool { return strings.HasSuffix(path, ".meta.lock") } // Purge purges the data of a given path func (XattrsBackend) Purge(path string) error { return nil } @@ -167,6 +168,9 @@ func (XattrsBackend) Rename(oldPath, newPath string) error { return nil } // MetadataPath returns the path of the file holding the metadata for the given path func (XattrsBackend) MetadataPath(path string) string { return path } +// LockfilePath returns the path of the lock file +func (XattrsBackend) LockfilePath(path string) string { return path + ".meta.lock" } + func cleanupLockfile(f *lockedfile.File) { _ = f.Close() _ = os.Remove(f.Name()) diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index 6bde18bd06..1475483994 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -70,7 +70,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen np := n.InternalPath() if items, err := filepath.Glob(np + node.RevisionIDDelimiter + "*"); err == nil { for i := range items { - if fs.lu.MetadataBackend().IsMetaFile(items[i]) { + if fs.lu.MetadataBackend().IsMetaFile(items[i]) || strings.HasSuffix(items[i], ".lock") { continue } @@ -237,7 +237,7 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer attributeName == prefixes.BlobsizeAttr }) if err != nil { - return errtypes.InternalError("failed to copy blob xattrs to version node") + return errtypes.InternalError("failed to copy blob xattrs to version node: " + err.Error()) } // remember mtime from node as new revision mtime @@ -256,7 +256,7 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer attributeName == prefixes.BlobsizeAttr }) if err != nil { - return errtypes.InternalError("failed to copy blob xattrs to old revision to node") + return errtypes.InternalError("failed to copy blob xattrs to old revision to node: " + err.Error()) } revisionSize, err := fs.lu.MetadataBackend().GetInt64(ctx, restoredRevisionPath, prefixes.BlobsizeAttr) diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index f809eec856..2613033a1e 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -40,7 +40,6 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" - "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/cs3org/reva/v2/pkg/utils" "github.com/google/uuid" "github.com/pkg/errors" @@ -750,17 +749,8 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err // lock parent before reading treesize or tree time _, subspan := tracer.Start(ctx, "lockedfile.OpenFile") - var parentFilename string - switch t.lookup.MetadataBackend().(type) { - case metadata.MessagePackBackend: - parentFilename = t.lookup.MetadataBackend().MetadataPath(n.ParentPath()) - f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600) - case metadata.XattrsBackend: - // we have to use dedicated lockfiles to lock directories - // this only works because the xattr backend also locks folders with separate lock files - parentFilename = n.ParentPath() + filelocks.LockFileSuffix - f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600) - } + parentFilename := t.lookup.MetadataBackend().LockfilePath(n.ParentPath()) + f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600) subspan.End() if err != nil { sublog.Error().Err(err). diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index 41ac0e4345..08bba8f996 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -38,7 +38,6 @@ import ( "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" @@ -329,23 +328,17 @@ func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*lockedfile.File, } // create and write lock new node metadata - f, err := lockedfile.OpenFile(upload.lu.MetadataBackend().MetadataPath(n.InternalPath()), os.O_RDWR|os.O_CREATE, 0600) + f, err := lockedfile.OpenFile(upload.lu.MetadataBackend().LockfilePath(n.InternalPath()), os.O_RDWR|os.O_CREATE, 0600) if err != nil { return nil, err } - switch upload.lu.MetadataBackend().(type) { - case metadata.MessagePackBackend: - // for the ini and metadata backend we also need to touch the actual node file here. - // it stores the mtime of the resource, which must not change when we update the ini file - h, err := os.OpenFile(n.InternalPath(), os.O_CREATE, 0600) - if err != nil { - return f, err - } - h.Close() - case metadata.XattrsBackend: - // nothing to do + // we also need to touch the actual node file here it stores the mtime of the resource + h, err := os.OpenFile(n.InternalPath(), os.O_CREATE, 0600) + if err != nil { + return f, err } + h.Close() if _, err := node.CheckQuota(upload.Ctx, n.SpaceRoot, false, 0, fsize); err != nil { return f, err @@ -403,7 +396,7 @@ func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint targetPath := n.InternalPath() // write lock existing node before reading treesize or tree time - f, err := lockedfile.OpenFile(upload.lu.MetadataBackend().MetadataPath(targetPath), os.O_RDWR, 0600) + f, err := lockedfile.OpenFile(upload.lu.MetadataBackend().LockfilePath(targetPath), os.O_RDWR|os.O_CREATE, 0600) if err != nil { return nil, err }