Skip to content

Commit

Permalink
use tusd datastorages
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

filter metadata

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

calculate sizediff on demand

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

fix empty bucketid

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

add changelog

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

drop unnecessary flag from cleanup

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

fix tests

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

drop unused variable

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

fix unit tests

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

move Finalize to upload package

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

move more functions to upload package

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

propagate sizediff on BytesReady and Revert on failure

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

reuse tus datastore from fs

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

always set file length when initiating uploads

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

log error if we cannot terminate upload

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

make linter happy

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

always send upload length

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

use s3ng config for datastore as well

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

allow configuring upload object prefixes and temp dir

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

fix s3ng test config

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

fix chunking v1

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

fix chunking v1 --sign

open existing file

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

restore size diff calculation

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

treat 0 byte uploads as size deferred

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

return empty reader for 0 byte files

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

comment change that always sends filesize in upload opaque

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

remove commented code

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

fix legacy chunking on s3

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

comment size to check if that causes nc tests te fail

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

run single failing test

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

drop unused option

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

ignore mlock files when iterating revisions

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Nov 1, 2023
1 parent 9f40f04 commit 1293660
Show file tree
Hide file tree
Showing 28 changed files with 1,522 additions and 1,324 deletions.
1 change: 1 addition & 0 deletions .drone.star
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ def s3ngIntegrationTests(parallelRuns, skipExceptParts = []):
"DIVIDE_INTO_NUM_PARTS": parallelRuns,
"RUN_PART": runPart,
"EXPECTED_FAILURES_FILE": "/drone/src/tests/acceptance/expected-failures-on-S3NG-storage.md",
"BEHAT_FEATURE": "tests/acceptance/features/coreApiTrashbin/trashbinDelete.feature:17",
},
},
],
Expand Down
5 changes: 5 additions & 0 deletions changelog/unreleased/tusd-data-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Use Tusd data storage implementations

Decomposedfs now uses the data store implementation for uploads that comes with tusd instead of implementing the interface itself. This allows storing uploads directly in s3. When all bytes are transferred tusd will call `PreFinishResponseCallback` if the storage driver implements it.

https://github.com/cs3org/reva/pull/4148
9 changes: 9 additions & 0 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,15 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
}, nil
}

// FIXME: This is a hack to transport more metadata to the storage.FS InitiateUpload implementation
// we should use a request object that can carry
// * if-match
// * if-unmodified-since
// * uploadLength from the tus Upload-Length header
// * checksum from the tus Upload-Checksum header
// * mtime from the X-OC-Mtime header
// * expires from the s.conf.UploadExpiration ... should that not be part of the driver?
// * providerID
metadata := map[string]string{}
ifMatch := req.GetIfMatch()
if ifMatch != "" {
Expand Down
4 changes: 4 additions & 0 deletions internal/http/services/dataprovider/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ func getDataTXs(c *config, fs storage.FS, publisher events.Publisher) (map[strin
if tx, err := f(c.DataTXs[t], publisher); err == nil {
if handler, err := tx.Handler(fs); err == nil {
txs[t] = handler
} else {
return nil, err
}
} else {
return nil, err
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions internal/http/services/owncloud/ocdav/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (s *svc) handleSpacesTusPost(w http.ResponseWriter, r *http.Request, spaceI

sublog := appctx.GetLogger(ctx).With().Str("spaceid", spaceID).Str("path", r.URL.Path).Logger()

// use filename to build a storage space reference
// but what if upload happens directly to toh resourceid .. and filename is empty?
// currently there is always a validator thet requires the filename is not empty ...
// hm -> bug: clients currently cannot POST to an existing source with a resource id only
ref, err := spacelookup.MakeStorageSpaceReference(spaceID, path.Join(r.URL.Path, meta["filename"]))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
Expand Down
44 changes: 44 additions & 0 deletions pkg/rhttp/datatx/manager/tus/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package tus

import (
"net/http"
"strings"

tusd "github.com/tus/tusd/pkg/handler"
)

type FilterResponseWriter struct {
w http.ResponseWriter
header http.Header
}

const TusPrefix = "tus."
const CS3Prefix = "cs3."

func NewFilterResponseWriter(w http.ResponseWriter) *FilterResponseWriter {
return &FilterResponseWriter{
w: w,
header: http.Header{},
}
}

func (fw *FilterResponseWriter) Header() http.Header {
return fw.w.Header()
}

func (fw *FilterResponseWriter) Write(b []byte) (int, error) {
return fw.w.Write(b)
}

func (fw *FilterResponseWriter) WriteHeader(statusCode int) {
metadata := tusd.ParseMetadataHeader(fw.w.Header().Get("Upload-Metadata"))
tusMetadata := map[string]string{}
for k, v := range metadata {
if strings.HasPrefix(k, TusPrefix) {
tusMetadata[strings.TrimPrefix(k, TusPrefix)] = v
}
}

fw.w.Header().Set("Upload-Metadata", tusd.SerializeMetadataHeader(tusMetadata))
fw.w.WriteHeader(statusCode)
}
103 changes: 73 additions & 30 deletions pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,47 @@ func New(m map[string]interface{}, publisher events.Publisher) (datatx.DataTX, e
}

func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
composable, ok := fs.(composable)
if !ok {
return nil, errtypes.NotSupported("file system does not support the tus protocol")
}

// A storage backend for tusd may consist of multiple different parts which
// handle upload creation, locking, termination and so on. The composer is a
// place where all those separated pieces are joined together. In this example
// we only use the file store but you may plug in multiple.
composer := tusd.NewStoreComposer()

// let the composable storage tell tus which extensions it supports
composable.UseIn(composer)

config := tusd.Config{
StoreComposer: composer,
StoreComposer: composer,
PreUploadCreateCallback: func(hook tusd.HookEvent) error {
return errors.New("uploads must be created with a cs3 InitiateUpload call")
},
NotifyCompleteUploads: true,
Logger: log.New(appctx.GetLogger(context.Background()), "", 0),
}

var dataStore tusd.DataStore

cb, ok := fs.(hasTusDatastore)
if ok {
dataStore = cb.GetDataStore()
composable, ok := dataStore.(composable)
if !ok {
return nil, errtypes.NotSupported("tus datastore is not composable")
}
composable.UseIn(composer)
config.PreFinishResponseCallback = cb.PreFinishResponseCallback
} else {
composable, ok := fs.(composable)
if !ok {
return nil, errtypes.NotSupported("file system does not support the tus protocol")
}

// let the composable storage tell tus which extensions it supports
composable.UseIn(composer)
dataStore, ok = fs.(tusd.DataStore)
if !ok {
return nil, errtypes.NotSupported("file system does not support the tus datastore")
}
}

handler, err := tusd.NewUnroutedHandler(config)
if err != nil {
return nil, err
Expand All @@ -108,30 +129,37 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
ev := <-handler.CompleteUploads
info := ev.Upload
spaceOwner := &userv1beta1.UserId{
OpaqueId: info.Storage["SpaceOwnerOrManager"],
OpaqueId: info.MetaData[CS3Prefix+"SpaceOwnerOrManager"],
}
owner := &userv1beta1.UserId{
Idp: info.Storage["Idp"],
OpaqueId: info.Storage["UserId"],
executant := &userv1beta1.UserId{
Type: userv1beta1.UserType(userv1beta1.UserType_value[info.MetaData[CS3Prefix+"ExecutantType"]]),
Idp: info.MetaData[CS3Prefix+"ExecutantIdp"],
OpaqueId: info.MetaData[CS3Prefix+"ExecutantId"],
}
ref := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: info.MetaData["providerID"],
SpaceId: info.Storage["SpaceRoot"],
OpaqueId: info.Storage["SpaceRoot"],
StorageId: info.MetaData[CS3Prefix+"providerID"],
SpaceId: info.MetaData[CS3Prefix+"SpaceRoot"],
OpaqueId: info.MetaData[CS3Prefix+"SpaceRoot"],
},
Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])),
// FIXME this seems wrong, path is not really relative to space root
// actually it is: InitiateUpload calls fs.lu.Path to get the path relative to the root...
// hm is that robust? what if the file is moved? shouldn't we store the parent id, then?
Path: utils.MakeRelativePath(filepath.Join(info.MetaData[CS3Prefix+"dir"], info.MetaData[CS3Prefix+"filename"])),
}
datatx.InvalidateCache(owner, ref, m.statCache)
datatx.InvalidateCache(executant, ref, m.statCache)
if m.publisher != nil {
if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil {
if err := datatx.EmitFileUploadedEvent(spaceOwner, executant, ref, m.publisher); err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event")
}
}
}
}()

h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// filter metadata headers
w = NewFilterResponseWriter(w)

method := r.Method
// https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override
if r.Header.Get("X-HTTP-Method-Override") != "" {
Expand All @@ -145,7 +173,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
metrics.UploadsActive.Sub(1)
}()
// set etag, mtime and file id
setHeaders(fs, w, r)
setHeaders(dataStore, w, r)
handler.PostFile(w, r)
case "HEAD":
handler.HeadFile(w, r)
Expand All @@ -155,7 +183,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
metrics.UploadsActive.Sub(1)
}()
// set etag, mtime and file id
setHeaders(fs, w, r)
setHeaders(dataStore, w, r)
handler.PatchFile(w, r)
case "DELETE":
handler.DelFile(w, r)
Expand All @@ -182,14 +210,14 @@ type composable interface {
UseIn(composer *tusd.StoreComposer)
}

func setHeaders(fs storage.FS, w http.ResponseWriter, r *http.Request) {
type hasTusDatastore interface {
PreFinishResponseCallback(hook tusd.HookEvent) error
GetDataStore() tusd.DataStore
}

func setHeaders(datastore tusd.DataStore, w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id := path.Base(r.URL.Path)
datastore, ok := fs.(tusd.DataStore)
if !ok {
appctx.GetLogger(ctx).Error().Interface("fs", fs).Msg("storage is not a tus datastore")
return
}
upload, err := datastore.GetUpload(ctx, id)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("could not get upload from storage")
Expand All @@ -200,14 +228,29 @@ func setHeaders(fs storage.FS, w http.ResponseWriter, r *http.Request) {
appctx.GetLogger(ctx).Error().Err(err).Msg("could not get upload info for upload")
return
}
expires := info.MetaData["expires"]
expires := info.MetaData[CS3Prefix+"expires"]
// fallback for outdated storageproviders that implement a tus datastore
if expires == "" {
expires = info.MetaData["expires"]
}
if expires != "" {
w.Header().Set(net.HeaderTusUploadExpires, expires)
}
resourceid := provider.ResourceId{
StorageId: info.MetaData["providerID"],
SpaceId: info.Storage["SpaceRoot"],
OpaqueId: info.Storage["NodeId"],
StorageId: info.MetaData[CS3Prefix+"providerID"],
SpaceId: info.MetaData[CS3Prefix+"SpaceRoot"],
OpaqueId: info.MetaData[CS3Prefix+"NodeId"],
}
// fallback for outdated storageproviders that implement a tus datastore
if resourceid.StorageId == "" {
resourceid.StorageId = info.MetaData["providerID"]
}
if resourceid.SpaceId == "" {
resourceid.SpaceId = info.MetaData["SpaceRoot"]
}
if resourceid.OpaqueId == "" {
resourceid.OpaqueId = info.MetaData["NodeId"]
}

w.Header().Set(net.HeaderOCFileID, storagespace.FormatResourceID(resourceid))
}
6 changes: 5 additions & 1 deletion pkg/storage/fs/ocis/ocis.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package ocis

import (
"path"
"path/filepath"

"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/tus/tusd/pkg/filestore"
)

func init() {
Expand All @@ -46,5 +48,7 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
return nil, err
}

return decomposedfs.NewDefault(m, bs, stream)
tusDataStore := filestore.New(filepath.Join(o.Root, "uploads"))

return decomposedfs.NewDefault(m, bs, tusDataStore, stream)
}
16 changes: 16 additions & 0 deletions pkg/storage/fs/s3ng/blobstore/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@ func New(endpoint, region, bucket, accessKey, secretKey string) (*Blobstore, err
}, nil
}

func (bs *Blobstore) MoveBlob(node *node.Node, source, bucket, key string) error {

_, err := bs.client.CopyObject(context.Background(), minio.CopyDestOptions{
Bucket: bs.bucket,
Object: bs.path(node),
}, minio.CopySrcOptions{
Bucket: bucket,
Object: key,
})

if err != nil {
return errors.Wrapf(err, "could not copy object bucket:'%s' key:'%s' to bucket:'%s' key'%s'", bs.bucket, bs.path(node), bucket, key)
}
return nil
}

// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(node *node.Node, source string) error {
reader, err := os.Open(source)
Expand Down
15 changes: 15 additions & 0 deletions pkg/storage/fs/s3ng/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ type Options struct {

// Secret key for the s3 blobstore
S3SecretKey string `mapstructure:"s3.secret_key"`

// UploadObjectPrefix for the s3 blobstore
S3UploadObjectPrefix string `mapstructure:"s3.upload_object_prefix"`

// UploadMetadataPrefix for the s3 blobstore
S3UploadMetadataPrefix string `mapstructure:"s3.upload_metadata_prefix"`

// UploadTemporaryDirectory for the s3 blobstore
S3UploadTemporaryDirectory string `mapstructure:"s3.upload_temporary_directory"`

// DisableSSL for the s3 blobstore
S3DisableSSL bool `mapstructure:"s3.disable_ssl"`

// ForcePathStyle for the s3 blobstore
S3ForcePathStyle bool `mapstructure:"s3.force_path_style"`
}

// S3ConfigComplete return true if all required s3 fields are set
Expand Down
19 changes: 18 additions & 1 deletion pkg/storage/fs/s3ng/s3ng.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ package s3ng
import (
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/fs/s3ng/blobstore"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs"
"github.com/tus/tusd/pkg/s3store"
)

func init() {
Expand All @@ -49,5 +54,17 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
return nil, err
}

return decomposedfs.NewDefault(m, bs, stream)
s3Config := aws.NewConfig()
s3Config.WithCredentials(credentials.NewStaticCredentials(o.S3AccessKey, o.S3SecretKey, "")).
WithEndpoint(o.S3Endpoint).
WithRegion(o.S3Region).
WithS3ForcePathStyle(o.S3ForcePathStyle).
WithDisableSSL(o.S3DisableSSL)

tusDataStore := s3store.New(o.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config))
tusDataStore.ObjectPrefix = o.S3UploadObjectPrefix
tusDataStore.MetadataObjectPrefix = o.S3UploadMetadataPrefix
tusDataStore.TemporaryDirectory = o.S3UploadTemporaryDirectory

return decomposedfs.NewDefault(m, bs, tusDataStore, stream)
}
Loading

0 comments on commit 1293660

Please sign in to comment.