Skip to content

Commit

Permalink
Merge pull request #3739 from dragotin/useRenameInUpload
Browse files Browse the repository at this point in the history
Try to rename the file after upload instead of copying.
  • Loading branch information
Klaas Freitag authored Mar 20, 2023
2 parents 3c11349 + 753386c commit d521bbf
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 56 deletions.
6 changes: 6 additions & 0 deletions changelog/unreleased/use-rename-after-upload.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Enhancement: Try to rename uploaded files to their final position

Before files were always copied which is a performance drop if rename can be done. If not, fallback to copy is happening.

https://github.com/cs3org/reva/pull/3739

20 changes: 16 additions & 4 deletions pkg/storage/fs/ocis/blobstore/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,32 @@ func New(root string) (*Blobstore, error) {
}

// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(node *node.Node, data io.Reader) error {
func (bs *Blobstore) Upload(node *node.Node, source string) error {

dest := bs.path(node)
// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(bs.path(node)), 0700); err != nil {
if err := os.MkdirAll(filepath.Dir(dest), 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: error creating parent folders for blob")
}

f, err := os.OpenFile(bs.path(node), os.O_CREATE|os.O_WRONLY, 0700)
if err := os.Rename(source, dest); err == nil {
return nil
}

// Rename failed, file needs to be copied.
file, err := os.Open(source)
if err != nil {
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: Can not open source file to upload")
}
defer file.Close()

f, err := os.OpenFile(dest, os.O_CREATE|os.O_WRONLY, 0700)
if err != nil {
return errors.Wrapf(err, "could not open blob '%s' for writing", bs.path(node))
}

w := bufio.NewWriter(f)
_, err = w.ReadFrom(data)
_, err = w.ReadFrom(file)
if err != nil {
return errors.Wrapf(err, "could not write blob '%s'", bs.path(node))
}
Expand Down
31 changes: 19 additions & 12 deletions pkg/storage/fs/ocis/blobstore/blobstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package blobstore_test

import (
"bytes"
"io"
"os"
"path"
Expand All @@ -34,10 +33,11 @@ import (

var _ = Describe("Blobstore", func() {
var (
tmpRoot string
blobNode *node.Node
blobPath string
data []byte
tmpRoot string
blobNode *node.Node
blobPath string
blobSrcFile string
data []byte

bs *blobstore.Blobstore
)
Expand All @@ -54,6 +54,8 @@ var _ = Describe("Blobstore", func() {
}
blobPath = path.Join(tmpRoot, "spaces", "wo", "nderfullspace", "blobs", "hu", "uu", "uu", "ge", "blob")

blobSrcFile = path.Join(tmpRoot, "blobsrc")

bs, err = blobstore.New(path.Join(tmpRoot))
Expect(err).ToNot(HaveOccurred())
})
Expand All @@ -69,14 +71,19 @@ var _ = Describe("Blobstore", func() {
Expect(err).ToNot(HaveOccurred())
})

Describe("Upload", func() {
It("writes the blob", func() {
err := bs.Upload(blobNode, bytes.NewReader(data))
Expect(err).ToNot(HaveOccurred())
Context("Blob upload", func() {
Describe("Upload", func() {
BeforeEach(func() {
Expect(os.WriteFile(blobSrcFile, data, 0700)).To(Succeed())
})
It("writes the blob", func() {
err := bs.Upload(blobNode, blobSrcFile)
Expect(err).ToNot(HaveOccurred())

writtenBytes, err := os.ReadFile(blobPath)
Expect(err).ToNot(HaveOccurred())
Expect(writtenBytes).To(Equal(data))
writtenBytes, err := os.ReadFile(blobPath)
Expect(err).ToNot(HaveOccurred())
Expect(writtenBytes).To(Equal(data))
})
})
})

Expand Down
17 changes: 7 additions & 10 deletions pkg/storage/fs/s3ng/blobstore/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,16 @@ func New(endpoint, region, bucket, accessKey, secretKey string) (*Blobstore, err
}

// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(node *node.Node, reader io.Reader) error {
size := int64(-1)
if file, ok := reader.(*os.File); ok {
info, err := file.Stat()
if err != nil {
return errors.Wrapf(err, "could not determine file size for object '%s'", bs.path(node))
}
size = info.Size()
func (bs *Blobstore) Upload(node *node.Node, source string) error {
reader, err := os.Open(source)
if err != nil {
return errors.Wrap(err, "can not open source file to upload")
}
defer reader.Close()

_, err := bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, size, minio.PutObjectOptions{ContentType: "application/octet-stream"})
_, err1 := bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, node.Blobsize, minio.PutObjectOptions{ContentType: "application/octet-stream"})

if err != nil {
if err1 != nil {
return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", bs.path(node), bs.bucket)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Tree interface {
RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error)
PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error)

WriteBlob(node *node.Node, reader io.Reader) error
WriteBlob(node *node.Node, source string) error
ReadBlob(node *node.Node) (io.ReadCloser, error)
DeleteBlob(node *node.Node) error

Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/utils/decomposedfs/mocks/Tree.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions pkg/storage/utils/decomposedfs/tree/mocks/Blobstore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import (

// Blobstore defines an interface for storing blobs in a blobstore
type Blobstore interface {
Upload(node *node.Node, reader io.Reader) error
Upload(node *node.Node, source string) error
Download(node *node.Node) (io.ReadCloser, error)
Delete(node *node.Node) error
}
Expand Down Expand Up @@ -821,8 +821,8 @@ func (t *Tree) calculateTreeSize(ctx context.Context, childrenPath string) (uint
}

// WriteBlob writes a blob to the blobstore
func (t *Tree) WriteBlob(node *node.Node, reader io.Reader) error {
return t.blobstore.Upload(node, reader)
func (t *Tree) WriteBlob(node *node.Node, source string) error {
return t.blobstore.Upload(node, source)
}

// ReadBlob reads a blob from the blobstore
Expand Down
9 changes: 2 additions & 7 deletions pkg/storage/utils/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Tree interface {
RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error)
PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error)

WriteBlob(node *node.Node, reader io.Reader) error
WriteBlob(node *node.Node, binPath string) error
ReadBlob(node *node.Node) (io.ReadCloser, error)
DeleteBlob(node *node.Node) error

Expand Down Expand Up @@ -335,13 +335,8 @@ func (upload *Upload) Finalize() (err error) {
}

// upload the data to the blobstore
file, err := os.Open(upload.binPath)
if err != nil {
return err
}
defer file.Close()

if err := upload.tp.WriteBlob(n, file); err != nil {
if err := upload.tp.WriteBlob(n, upload.binPath); err != nil {
return errors.Wrap(err, "failed to upload file to blostore")
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/storage/utils/decomposedfs/upload_async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,10 @@ var _ = Describe("Async file uploads", Ordered, func() {
Expect(err).ToNot(HaveOccurred())
ref.ResourceId = &resID

bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("*os.File"), mock.Anything).
bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
reader := args.Get(1).(io.Reader)
data, err := io.ReadAll(reader)
data, err := os.ReadFile(args.Get(1).(string))

Expect(err).ToNot(HaveOccurred())
Expect(data).To(Equal(fileContent))
Expand Down
10 changes: 4 additions & 6 deletions pkg/storage/utils/decomposedfs/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,10 @@ var _ = Describe("File uploads", func() {

uploadRef := &provider.Reference{Path: "/" + uploadIds["simple"]}

bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("*os.File"), mock.Anything).
bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
reader := args.Get(1).(io.Reader)
data, err := io.ReadAll(reader)
data, err := os.ReadFile(args.Get(1).(string))

Expect(err).ToNot(HaveOccurred())
Expect(data).To(Equal([]byte("0123456789")))
Expand Down Expand Up @@ -283,11 +282,10 @@ var _ = Describe("File uploads", func() {

uploadRef := &provider.Reference{Path: "/" + uploadIds["simple"]}

bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("*os.File"), mock.Anything).
bs.On("Upload", mock.AnythingOfType("*node.Node"), mock.AnythingOfType("string"), mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
reader := args.Get(1).(io.Reader)
data, err := io.ReadAll(reader)
data, err := os.ReadFile(args.Get(1).(string))

Expect(err).ToNot(HaveOccurred())
Expect(data).To(Equal([]byte("")))
Expand Down

0 comments on commit d521bbf

Please sign in to comment.