Skip to content

Commit

Permalink
Merge pull request #4494 from aduffeck/decomposed-based-posixfs
Browse files Browse the repository at this point in the history
Decomposed based posixfs
  • Loading branch information
aduffeck authored Feb 2, 2024
2 parents 72be9fe + d28d673 commit c7df4e7
Show file tree
Hide file tree
Showing 30 changed files with 1,957 additions and 112 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/posixfs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Start implementation of a plain posix storage driver

We started to lay the groundwork for a new posixfs storage driver based on decomposedfs. We also refactored decomposedfs to be a bit more modular and cleaned up the initialization.

https://github.com/cs3org/reva/pull/4494
1 change: 1 addition & 0 deletions pkg/storage/fs/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
_ "github.com/cs3org/reva/v2/pkg/storage/fs/nextcloud"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/ocis"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/owncloudsql"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/posix"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/s3"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/s3ng"
// Add your own here
Expand Down
122 changes: 122 additions & 0 deletions pkg/storage/fs/posix/blobstore/blobstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package blobstore

import (
"bufio"
"fmt"
"io"
"os"
"path/filepath"

"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/pkg/errors"
)

// Blobstore provides an interface to an filesystem based blobstore
type Blobstore struct {
root string
}

// New returns a new Blobstore
func New(root string) (*Blobstore, error) {
err := os.MkdirAll(root, 0700)
if err != nil {
return nil, err
}

return &Blobstore{
root: root,
}, nil
}

// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(node *node.Node, source string) error {
dest, err := bs.path(node)
if err != nil {
return err
}
// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(dest), 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: error creating parent folders for blob")
}

if err := os.Rename(source, dest); err == nil {
return nil
}

// Rename failed, file needs to be copied.
file, err := os.Open(source)
if err != nil {
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: Can not open source file to upload")
}
defer file.Close()

f, err := os.OpenFile(dest, os.O_CREATE|os.O_WRONLY, 0700)
if err != nil {
return errors.Wrapf(err, "could not open blob '%s' for writing", dest)
}

w := bufio.NewWriter(f)
_, err = w.ReadFrom(file)
if err != nil {
return errors.Wrapf(err, "could not write blob '%s'", dest)
}

return w.Flush()
}

// Download retrieves a blob from the blobstore for reading
func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) {
dest, err := bs.path(node)
if err != nil {
return nil, err
}
file, err := os.Open(dest)
if err != nil {
return nil, errors.Wrapf(err, "could not read blob '%s'", dest)
}
return file, nil
}

// Delete deletes a blob from the blobstore
func (bs *Blobstore) Delete(node *node.Node) error {
dest, err := bs.path(node)
if err != nil {
return err
}
if err := utils.RemoveItem(dest); err != nil {
return errors.Wrapf(err, "could not delete blob '%s'", dest)
}
return nil
}

func (bs *Blobstore) path(node *node.Node) (string, error) {
if node.BlobID == "" {
return "", fmt.Errorf("blobstore: BlobID is empty")
}
return filepath.Join(
bs.root,
filepath.Clean(filepath.Join(
"/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)),
),
), nil
}
31 changes: 31 additions & 0 deletions pkg/storage/fs/posix/blobstore/blobstore_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package blobstore_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestBlobstore(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Blobstore Suite")
}
130 changes: 130 additions & 0 deletions pkg/storage/fs/posix/blobstore/blobstore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package blobstore_test

import (
"io"
"os"
"path"

"github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/tests/helpers"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Blobstore", func() {
var (
tmpRoot string
blobNode *node.Node
blobPath string
blobSrcFile string
data []byte

bs *blobstore.Blobstore
)

BeforeEach(func() {
var err error
tmpRoot, err = helpers.TempDir("reva-unit-tests-*-root")
Expect(err).ToNot(HaveOccurred())

data = []byte("1234567890")
blobNode = &node.Node{
SpaceID: "wonderfullspace",
BlobID: "huuuuugeblob",
}
blobPath = path.Join(tmpRoot, "spaces", "wo", "nderfullspace", "blobs", "hu", "uu", "uu", "ge", "blob")

blobSrcFile = path.Join(tmpRoot, "blobsrc")

bs, err = blobstore.New(path.Join(tmpRoot))
Expect(err).ToNot(HaveOccurred())
})

AfterEach(func() {
if tmpRoot != "" {
os.RemoveAll(tmpRoot)
}
})

It("creates the root directory if it doesn't exist", func() {
_, err := os.Stat(path.Join(tmpRoot))
Expect(err).ToNot(HaveOccurred())
})

Context("Blob upload", func() {
Describe("Upload", func() {
BeforeEach(func() {
Expect(os.WriteFile(blobSrcFile, data, 0700)).To(Succeed())
})
It("writes the blob", func() {
err := bs.Upload(blobNode, blobSrcFile)
Expect(err).ToNot(HaveOccurred())

writtenBytes, err := os.ReadFile(blobPath)
Expect(err).ToNot(HaveOccurred())
Expect(writtenBytes).To(Equal(data))
})
})
})

Context("with an existing blob", func() {
BeforeEach(func() {
Expect(os.MkdirAll(path.Dir(blobPath), 0700)).To(Succeed())
Expect(os.WriteFile(blobPath, data, 0700)).To(Succeed())
})

Describe("Download", func() {
It("cleans the key", func() {
reader, err := bs.Download(blobNode)
Expect(err).ToNot(HaveOccurred())

readData, err := io.ReadAll(reader)
Expect(err).ToNot(HaveOccurred())
Expect(readData).To(Equal(data))
})

It("returns a reader to the blob", func() {
reader, err := bs.Download(blobNode)
Expect(err).ToNot(HaveOccurred())

readData, err := io.ReadAll(reader)
Expect(err).ToNot(HaveOccurred())
Expect(readData).To(Equal(data))
})
})

Describe("Delete", func() {
It("deletes the blob", func() {
_, err := os.Stat(blobPath)
Expect(err).ToNot(HaveOccurred())

err = bs.Delete(blobNode)
Expect(err).ToNot(HaveOccurred())

_, err = os.Stat(blobPath)
Expect(err).To(HaveOccurred())
})
})
})

})
Loading

0 comments on commit c7df4e7

Please sign in to comment.