diff --git a/.drone.star b/.drone.star index 44da75e7a9..6da21a88d3 100644 --- a/.drone.star +++ b/.drone.star @@ -692,6 +692,7 @@ def s3ngIntegrationTests(parallelRuns, skipExceptParts = []): "DIVIDE_INTO_NUM_PARTS": parallelRuns, "RUN_PART": runPart, "EXPECTED_FAILURES_FILE": "/drone/src/tests/acceptance/expected-failures-on-S3NG-storage.md", + "BEHAT_FEATURE": "tests/acceptance/features/coreApiTrashbin/trashbinDelete.feature:17", }, }, ], diff --git a/changelog/unreleased/tusd-data-storage.md b/changelog/unreleased/tusd-data-storage.md new file mode 100644 index 0000000000..5a518d6196 --- /dev/null +++ b/changelog/unreleased/tusd-data-storage.md @@ -0,0 +1,5 @@ +Enhancement: Use Tusd data storage implementations + +Decomposedfs now uses the data store implementation for uploads that comes with tusd instead of implementing the interface itself. This allows storing uploads directly in s3. When all bytes are transferred tusd will call `PreFinishResponseCallback` if the storage driver implements it. + +https://github.com/cs3org/reva/pull/4148 \ No newline at end of file diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index f7812783cf..0132e92d8b 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -338,6 +338,15 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate }, nil } + // FIXME: This is a hack to transport more metadata to the storage.FS InitiateUpload implementation + // we should use a request object that can carry + // * if-match + // * if-unmodified-since + // * uploadLength from the tus Upload-Length header + // * checksum from the tus Upload-Checksum header + // * mtime from the X-OC-Mtime header + // * expires from the s.conf.UploadExpiration ... should that not be part of the driver? + // * providerID metadata := map[string]string{} ifMatch := req.GetIfMatch() if ifMatch != "" { diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index 8c3a31195f..b2f8d5eb8a 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -143,7 +143,11 @@ func getDataTXs(c *config, fs storage.FS, publisher events.Publisher) (map[strin if tx, err := f(c.DataTXs[t], publisher); err == nil { if handler, err := tx.Handler(fs); err == nil { txs[t] = handler + } else { + return nil, err } + } else { + return nil, err } } } diff --git a/internal/http/services/owncloud/ocdav/tus.go b/internal/http/services/owncloud/ocdav/tus.go index 6fbbbb6e43..96c06c83f7 100644 --- a/internal/http/services/owncloud/ocdav/tus.go +++ b/internal/http/services/owncloud/ocdav/tus.go @@ -88,6 +88,10 @@ func (s *svc) handleSpacesTusPost(w http.ResponseWriter, r *http.Request, spaceI sublog := appctx.GetLogger(ctx).With().Str("spaceid", spaceID).Str("path", r.URL.Path).Logger() + // use filename to build a storage space reference + // but what if upload happens directly to toh resourceid .. and filename is empty? + // currently there is always a validator thet requires the filename is not empty ... + // hm -> bug: clients currently cannot POST to an existing source with a resource id only ref, err := spacelookup.MakeStorageSpaceReference(spaceID, path.Join(r.URL.Path, meta["filename"])) if err != nil { w.WriteHeader(http.StatusBadRequest) diff --git a/pkg/rhttp/datatx/manager/tus/filter.go b/pkg/rhttp/datatx/manager/tus/filter.go new file mode 100644 index 0000000000..ca4eeaaeaa --- /dev/null +++ b/pkg/rhttp/datatx/manager/tus/filter.go @@ -0,0 +1,44 @@ +package tus + +import ( + "net/http" + "strings" + + tusd "github.com/tus/tusd/pkg/handler" +) + +type FilterResponseWriter struct { + w http.ResponseWriter + header http.Header +} + +const TusPrefix = "tus." +const CS3Prefix = "cs3." + +func NewFilterResponseWriter(w http.ResponseWriter) *FilterResponseWriter { + return &FilterResponseWriter{ + w: w, + header: http.Header{}, + } +} + +func (fw *FilterResponseWriter) Header() http.Header { + return fw.w.Header() +} + +func (fw *FilterResponseWriter) Write(b []byte) (int, error) { + return fw.w.Write(b) +} + +func (fw *FilterResponseWriter) WriteHeader(statusCode int) { + metadata := tusd.ParseMetadataHeader(fw.w.Header().Get("Upload-Metadata")) + tusMetadata := map[string]string{} + for k, v := range metadata { + if strings.HasPrefix(k, TusPrefix) { + tusMetadata[strings.TrimPrefix(k, TusPrefix)] = v + } + } + + fw.w.Header().Set("Upload-Metadata", tusd.SerializeMetadataHeader(tusMetadata)) + fw.w.WriteHeader(statusCode) +} diff --git a/pkg/rhttp/datatx/manager/tus/tus.go b/pkg/rhttp/datatx/manager/tus/tus.go index f98ce8adb1..755d5b922f 100644 --- a/pkg/rhttp/datatx/manager/tus/tus.go +++ b/pkg/rhttp/datatx/manager/tus/tus.go @@ -78,10 +78,6 @@ func New(m map[string]interface{}, publisher events.Publisher) (datatx.DataTX, e } func (m *manager) Handler(fs storage.FS) (http.Handler, error) { - composable, ok := fs.(composable) - if !ok { - return nil, errtypes.NotSupported("file system does not support the tus protocol") - } // A storage backend for tusd may consist of multiple different parts which // handle upload creation, locking, termination and so on. The composer is a @@ -89,15 +85,40 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { // we only use the file store but you may plug in multiple. composer := tusd.NewStoreComposer() - // let the composable storage tell tus which extensions it supports - composable.UseIn(composer) - config := tusd.Config{ - StoreComposer: composer, + StoreComposer: composer, + PreUploadCreateCallback: func(hook tusd.HookEvent) error { + return errors.New("uploads must be created with a cs3 InitiateUpload call") + }, NotifyCompleteUploads: true, Logger: log.New(appctx.GetLogger(context.Background()), "", 0), } + var dataStore tusd.DataStore + + cb, ok := fs.(hasTusDatastore) + if ok { + dataStore = cb.GetDataStore() + composable, ok := dataStore.(composable) + if !ok { + return nil, errtypes.NotSupported("tus datastore is not composable") + } + composable.UseIn(composer) + config.PreFinishResponseCallback = cb.PreFinishResponseCallback + } else { + composable, ok := fs.(composable) + if !ok { + return nil, errtypes.NotSupported("file system does not support the tus protocol") + } + + // let the composable storage tell tus which extensions it supports + composable.UseIn(composer) + dataStore, ok = fs.(tusd.DataStore) + if !ok { + return nil, errtypes.NotSupported("file system does not support the tus datastore") + } + } + handler, err := tusd.NewUnroutedHandler(config) if err != nil { return nil, err @@ -108,23 +129,27 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { ev := <-handler.CompleteUploads info := ev.Upload spaceOwner := &userv1beta1.UserId{ - OpaqueId: info.Storage["SpaceOwnerOrManager"], + OpaqueId: info.MetaData[CS3Prefix+"SpaceOwnerOrManager"], } - owner := &userv1beta1.UserId{ - Idp: info.Storage["Idp"], - OpaqueId: info.Storage["UserId"], + executant := &userv1beta1.UserId{ + Type: userv1beta1.UserType(userv1beta1.UserType_value[info.MetaData[CS3Prefix+"ExecutantType"]]), + Idp: info.MetaData[CS3Prefix+"ExecutantIdp"], + OpaqueId: info.MetaData[CS3Prefix+"ExecutantId"], } ref := &provider.Reference{ ResourceId: &provider.ResourceId{ - StorageId: info.MetaData["providerID"], - SpaceId: info.Storage["SpaceRoot"], - OpaqueId: info.Storage["SpaceRoot"], + StorageId: info.MetaData[CS3Prefix+"providerID"], + SpaceId: info.MetaData[CS3Prefix+"SpaceRoot"], + OpaqueId: info.MetaData[CS3Prefix+"SpaceRoot"], }, - Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), + // FIXME this seems wrong, path is not really relative to space root + // actually it is: InitiateUpload calls fs.lu.Path to get the path relative to the root... + // hm is that robust? what if the file is moved? shouldn't we store the parent id, then? + Path: utils.MakeRelativePath(filepath.Join(info.MetaData[CS3Prefix+"dir"], info.MetaData[CS3Prefix+"filename"])), } - datatx.InvalidateCache(owner, ref, m.statCache) + datatx.InvalidateCache(executant, ref, m.statCache) if m.publisher != nil { - if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil { + if err := datatx.EmitFileUploadedEvent(spaceOwner, executant, ref, m.publisher); err != nil { appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event") } } @@ -132,6 +157,9 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { }() h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // filter metadata headers + w = NewFilterResponseWriter(w) + method := r.Method // https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override if r.Header.Get("X-HTTP-Method-Override") != "" { @@ -145,7 +173,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { metrics.UploadsActive.Sub(1) }() // set etag, mtime and file id - setHeaders(fs, w, r) + setHeaders(dataStore, w, r) handler.PostFile(w, r) case "HEAD": handler.HeadFile(w, r) @@ -155,7 +183,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { metrics.UploadsActive.Sub(1) }() // set etag, mtime and file id - setHeaders(fs, w, r) + setHeaders(dataStore, w, r) handler.PatchFile(w, r) case "DELETE": handler.DelFile(w, r) @@ -182,14 +210,14 @@ type composable interface { UseIn(composer *tusd.StoreComposer) } -func setHeaders(fs storage.FS, w http.ResponseWriter, r *http.Request) { +type hasTusDatastore interface { + PreFinishResponseCallback(hook tusd.HookEvent) error + GetDataStore() tusd.DataStore +} + +func setHeaders(datastore tusd.DataStore, w http.ResponseWriter, r *http.Request) { ctx := r.Context() id := path.Base(r.URL.Path) - datastore, ok := fs.(tusd.DataStore) - if !ok { - appctx.GetLogger(ctx).Error().Interface("fs", fs).Msg("storage is not a tus datastore") - return - } upload, err := datastore.GetUpload(ctx, id) if err != nil { appctx.GetLogger(ctx).Error().Err(err).Msg("could not get upload from storage") @@ -200,14 +228,29 @@ func setHeaders(fs storage.FS, w http.ResponseWriter, r *http.Request) { appctx.GetLogger(ctx).Error().Err(err).Msg("could not get upload info for upload") return } - expires := info.MetaData["expires"] + expires := info.MetaData[CS3Prefix+"expires"] + // fallback for outdated storageproviders that implement a tus datastore + if expires == "" { + expires = info.MetaData["expires"] + } if expires != "" { w.Header().Set(net.HeaderTusUploadExpires, expires) } resourceid := provider.ResourceId{ - StorageId: info.MetaData["providerID"], - SpaceId: info.Storage["SpaceRoot"], - OpaqueId: info.Storage["NodeId"], + StorageId: info.MetaData[CS3Prefix+"providerID"], + SpaceId: info.MetaData[CS3Prefix+"SpaceRoot"], + OpaqueId: info.MetaData[CS3Prefix+"NodeId"], } + // fallback for outdated storageproviders that implement a tus datastore + if resourceid.StorageId == "" { + resourceid.StorageId = info.MetaData["providerID"] + } + if resourceid.SpaceId == "" { + resourceid.SpaceId = info.MetaData["SpaceRoot"] + } + if resourceid.OpaqueId == "" { + resourceid.OpaqueId = info.MetaData["NodeId"] + } + w.Header().Set(net.HeaderOCFileID, storagespace.FormatResourceID(resourceid)) } diff --git a/pkg/storage/fs/ocis/ocis.go b/pkg/storage/fs/ocis/ocis.go index 812b44e4f7..a9583f7c91 100644 --- a/pkg/storage/fs/ocis/ocis.go +++ b/pkg/storage/fs/ocis/ocis.go @@ -20,6 +20,7 @@ package ocis import ( "path" + "path/filepath" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" @@ -27,6 +28,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/tus/tusd/pkg/filestore" ) func init() { @@ -46,5 +48,7 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) { return nil, err } - return decomposedfs.NewDefault(m, bs, stream) + tusDataStore := filestore.New(filepath.Join(o.Root, "uploads")) + + return decomposedfs.NewDefault(m, bs, tusDataStore, stream) } diff --git a/pkg/storage/fs/s3ng/blobstore/blobstore.go b/pkg/storage/fs/s3ng/blobstore/blobstore.go index 9c744e7540..d4b6926ae7 100644 --- a/pkg/storage/fs/s3ng/blobstore/blobstore.go +++ b/pkg/storage/fs/s3ng/blobstore/blobstore.go @@ -63,6 +63,22 @@ func New(endpoint, region, bucket, accessKey, secretKey string) (*Blobstore, err }, nil } +func (bs *Blobstore) MoveBlob(node *node.Node, source, bucket, key string) error { + + _, err := bs.client.CopyObject(context.Background(), minio.CopyDestOptions{ + Bucket: bs.bucket, + Object: bs.path(node), + }, minio.CopySrcOptions{ + Bucket: bucket, + Object: key, + }) + + if err != nil { + return errors.Wrapf(err, "could not copy object bucket:'%s' key:'%s' to bucket:'%s' key'%s'", bs.bucket, bs.path(node), bucket, key) + } + return nil +} + // Upload stores some data in the blobstore under the given key func (bs *Blobstore) Upload(node *node.Node, source string) error { reader, err := os.Open(source) diff --git a/pkg/storage/fs/s3ng/option.go b/pkg/storage/fs/s3ng/option.go index 877a7d7189..cf8fc08ecc 100644 --- a/pkg/storage/fs/s3ng/option.go +++ b/pkg/storage/fs/s3ng/option.go @@ -43,6 +43,21 @@ type Options struct { // Secret key for the s3 blobstore S3SecretKey string `mapstructure:"s3.secret_key"` + + // UploadObjectPrefix for the s3 blobstore + S3UploadObjectPrefix string `mapstructure:"s3.upload_object_prefix"` + + // UploadMetadataPrefix for the s3 blobstore + S3UploadMetadataPrefix string `mapstructure:"s3.upload_metadata_prefix"` + + // UploadTemporaryDirectory for the s3 blobstore + S3UploadTemporaryDirectory string `mapstructure:"s3.upload_temporary_directory"` + + // DisableSSL for the s3 blobstore + S3DisableSSL bool `mapstructure:"s3.disable_ssl"` + + // ForcePathStyle for the s3 blobstore + S3ForcePathStyle bool `mapstructure:"s3.force_path_style"` } // S3ConfigComplete return true if all required s3 fields are set diff --git a/pkg/storage/fs/s3ng/s3ng.go b/pkg/storage/fs/s3ng/s3ng.go index 5cc7f8873b..ef5eca87a8 100644 --- a/pkg/storage/fs/s3ng/s3ng.go +++ b/pkg/storage/fs/s3ng/s3ng.go @@ -21,11 +21,16 @@ package s3ng import ( "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/cs3org/reva/v2/pkg/storage/fs/s3ng/blobstore" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs" + "github.com/tus/tusd/pkg/s3store" ) func init() { @@ -49,5 +54,17 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) { return nil, err } - return decomposedfs.NewDefault(m, bs, stream) + s3Config := aws.NewConfig() + s3Config.WithCredentials(credentials.NewStaticCredentials(o.S3AccessKey, o.S3SecretKey, "")). + WithEndpoint(o.S3Endpoint). + WithRegion(o.S3Region). + WithS3ForcePathStyle(o.S3ForcePathStyle). + WithDisableSSL(o.S3DisableSSL) + + tusDataStore := s3store.New(o.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config)) + tusDataStore.ObjectPrefix = o.S3UploadObjectPrefix + tusDataStore.MetadataObjectPrefix = o.S3UploadMetadataPrefix + tusDataStore.TemporaryDirectory = o.S3UploadTemporaryDirectory + + return decomposedfs.NewDefault(m, bs, tusDataStore, stream) } diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 5c341c7357..e42d707c14 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -34,7 +34,6 @@ import ( "strings" "time" - user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" @@ -61,6 +60,7 @@ import ( "github.com/cs3org/reva/v2/pkg/utils" "github.com/jellydator/ttlcache/v2" "github.com/pkg/errors" + tusHandler "github.com/tus/tusd/pkg/handler" microstore "go-micro.dev/v4/store" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" @@ -96,10 +96,6 @@ type Tree interface { RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error) PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error) - WriteBlob(node *node.Node, source string) error - ReadBlob(node *node.Node) (io.ReadCloser, error) - DeleteBlob(node *node.Node) error - Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error) } @@ -112,6 +108,8 @@ type Decomposedfs struct { chunkHandler *chunking.ChunkHandler stream events.Stream cache cache.StatCache + tusDataStore tusHandler.DataStore + blobstore tree.Blobstore UserCache *ttlcache.Cache userSpaceIndex *spaceidindex.Index @@ -120,7 +118,7 @@ type Decomposedfs struct { } // NewDefault returns an instance with default components -func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (storage.FS, error) { +func NewDefault(m map[string]interface{}, bs tree.Blobstore, tusDataStore tusHandler.DataStore, es events.Stream) (storage.FS, error) { o, err := options.New(m) if err != nil { return nil, err @@ -152,12 +150,12 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) ( permissions := NewPermissions(node.NewPermissions(lu), permissionsSelector) - return New(o, lu, permissions, tp, es) + return New(o, lu, permissions, tp, es, tusDataStore, bs) } // New returns an implementation of the storage.FS interface that talks to // a local filesystem. -func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es events.Stream) (storage.FS, error) { +func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es events.Stream, tusDataStore tusHandler.DataStore, blobstore tree.Blobstore) (storage.FS, error) { log := logger.New() err := tp.Setup() if err != nil { @@ -208,6 +206,8 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event userSpaceIndex: userSpaceIndex, groupSpaceIndex: groupSpaceIndex, spaceTypeIndex: spaceTypeIndex, + tusDataStore: tusDataStore, + blobstore: blobstore, } if o.AsyncFileUploads { @@ -226,258 +226,13 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event } for i := 0; i < o.Events.NumConsumers; i++ { - go fs.Postprocessing(ch) + go upload.Postprocessing(lu, tp, fs.cache, es, tusDataStore, blobstore, fs.downloadURL, ch) } } return fs, nil } -// Postprocessing starts the postprocessing result collector -func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { - ctx := context.TODO() // we should pass the trace id in the event and initialize the trace provider here - ctx, span := tracer.Start(ctx, "Postprocessing") - defer span.End() - log := logger.New() - for event := range ch { - switch ev := event.Event.(type) { - case events.PostprocessingFinished: - up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") - continue // NOTE: since we can't get the upload, we can't delete the blob - } - - var ( - failed bool - keepUpload bool - ) - - n, err := node.ReadNode(ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false, nil, true) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node") - continue - } - up.Node = n - - switch ev.Outcome { - default: - log.Error().Str("outcome", string(ev.Outcome)).Str("uploadID", ev.UploadID).Msg("unknown postprocessing outcome - aborting") - fallthrough - case events.PPOutcomeAbort: - failed = true - keepUpload = true - case events.PPOutcomeContinue: - if err := up.Finalize(); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload") - keepUpload = true // should we keep the upload when assembling failed? - failed = true - } - case events.PPOutcomeDelete: - failed = true - } - - getParent := func() *node.Node { - p, err := up.Node.Parent(ctx) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read parent") - return nil - } - return p - } - - now := time.Now() - if failed { - // propagate sizeDiff after failed postprocessing - if err := fs.tp.Propagate(ctx, up.Node, -up.SizeDiff); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change") - } - } else if p := getParent(); p != nil { - // update parent tmtime to propagate etag change after successful postprocessing - _ = p.SetTMTime(ctx, &now) - if err := fs.tp.Propagate(ctx, p, 0); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate etag change") - } - } - - upload.Cleanup(up, failed, keepUpload) - - // remove cache entry in gateway - fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) - - if err := events.Publish( - ctx, - fs.stream, - events.UploadReady{ - UploadID: ev.UploadID, - Failed: failed, - ExecutingUser: ev.ExecutingUser, - Filename: ev.Filename, - FileRef: &provider.Reference{ - ResourceId: &provider.ResourceId{ - StorageId: up.Info.MetaData["providerID"], - SpaceId: up.Info.Storage["SpaceRoot"], - OpaqueId: up.Info.Storage["SpaceRoot"], - }, - Path: utils.MakeRelativePath(filepath.Join(up.Info.MetaData["dir"], up.Info.MetaData["filename"])), - }, - Timestamp: utils.TimeToTS(now), - SpaceOwner: n.SpaceOwnerOrManager(ctx), - }, - ); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") - } - case events.RestartPostprocessing: - up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") - continue - } - n, err := node.ReadNode(ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false, nil, true) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node") - continue - } - s, err := up.URL(up.Ctx) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url") - continue - } - // restart postprocessing - if err := events.Publish(ctx, fs.stream, events.BytesReceived{ - UploadID: up.Info.ID, - URL: s, - SpaceOwner: n.SpaceOwnerOrManager(up.Ctx), - ExecutingUser: &user.User{Id: &user.UserId{OpaqueId: "postprocessing-restart"}}, // send nil instead? - ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}, - Filename: up.Info.Storage["NodeName"], - Filesize: uint64(up.Info.Size), - }); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish BytesReceived event") - } - case events.PostprocessingStepFinished: - if ev.FinishedStep != events.PPStepAntivirus { - // atm we are only interested in antivirus results - continue - } - - res := ev.Result.(events.VirusscanResult) - if res.ErrorMsg != "" { - // scan failed somehow - // Should we handle this here? - continue - } - - var n *node.Node - switch ev.UploadID { - case "": - // uploadid is empty -> this was an on-demand scan - /* ON DEMAND SCANNING NOT SUPPORTED ATM - ctx := ctxpkg.ContextSetUser(context.Background(), ev.ExecutingUser) - ref := &provider.Reference{ResourceId: ev.ResourceID} - - no, err := fs.lu.NodeFromResource(ctx, ref) - if err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to get node after scan") - continue - - } - n = no - if ev.Outcome == events.PPOutcomeDelete { - // antivir wants us to delete the file. We must obey and need to - - // check if there a previous versions existing - revs, err := fs.ListRevisions(ctx, ref) - if len(revs) == 0 { - if err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to list revisions. Fallback to delete file") - } - - // no versions -> trash file - err := fs.Delete(ctx, ref) - if err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to delete infected resource") - continue - } - - // now purge it from the recycle bin - if err := fs.PurgeRecycleItem(ctx, &provider.Reference{ResourceId: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.SpaceID}}, n.ID, "/"); err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to purge infected resource from trash") - } - - // remove cache entry in gateway - fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) - continue - } - - // we have versions - find the newest - versions := make(map[uint64]string) // remember all versions - we need them later - var nv uint64 - for _, v := range revs { - versions[v.Mtime] = v.Key - if v.Mtime > nv { - nv = v.Mtime - } - } - - // restore newest version - if err := fs.RestoreRevision(ctx, ref, versions[nv]); err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", versions[nv]).Msg("Failed to restore revision") - continue - } - - // now find infected version - revs, err = fs.ListRevisions(ctx, ref) - if err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Error listing revisions after restore") - } - - for _, v := range revs { - // we looking for a version that was previously not there - if _, ok := versions[v.Mtime]; ok { - continue - } - - if err := fs.DeleteRevision(ctx, ref, v.Key); err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", v.Key).Msg("Failed to delete revision") - } - } - - // remove cache entry in gateway - fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) - continue - } - */ - default: - // uploadid is not empty -> this is an async upload - up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") - continue - } - - no, err := node.ReadNode(up.Ctx, fs.lu, up.Info.Storage["SpaceRoot"], up.Info.Storage["NodeId"], false, nil, false) - if err != nil { - log.Error().Err(err).Interface("uploadID", ev.UploadID).Msg("Failed to get node after scan") - continue - } - - n = no - } - - if err := n.SetScanData(ctx, res.Description, res.Scandate); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", res.ResourceID).Msg("Failed to set scan results") - continue - } - - // remove cache entry in gateway - fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) - default: - log.Error().Interface("event", ev).Msg("Unknown event") - } - } -} - // Shutdown shuts down the storage func (fs *Decomposedfs) Shutdown(ctx context.Context) error { return nil @@ -1027,9 +782,12 @@ func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) ( if currentEtag != expectedEtag { return nil, errtypes.Aborted(fmt.Sprintf("file changed from etag %s to %s", expectedEtag, currentEtag)) } - reader, err := fs.tp.ReadBlob(n) + if n.Blobsize == 0 { + return io.NopCloser(strings.NewReader("")), nil + } + reader, err := fs.blobstore.Download(n) if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: error download blob '"+n.ID+"'") + return nil, errors.Wrap(err, "Decomposedfs: error downloading blob '"+n.BlobID+"' for node '"+n.ID+"'") } return reader, nil } diff --git a/pkg/storage/utils/decomposedfs/decomposedfs_test.go b/pkg/storage/utils/decomposedfs/decomposedfs_test.go index 0dacc5b2d2..18f770a920 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs_test.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs_test.go @@ -61,7 +61,7 @@ var _ = Describe("Decomposed", func() { _, err := decomposedfs.NewDefault(map[string]interface{}{ "root": env.Root, "permissionssvc": "any", - }, bs, nil) + }, bs, env.DataStore, nil) Expect(err).ToNot(HaveOccurred()) }) }) diff --git a/pkg/storage/utils/decomposedfs/mocks/Tree.go b/pkg/storage/utils/decomposedfs/mocks/Tree.go index 848f13e4f3..435d2f3fe8 100644 --- a/pkg/storage/utils/decomposedfs/mocks/Tree.go +++ b/pkg/storage/utils/decomposedfs/mocks/Tree.go @@ -25,8 +25,6 @@ import ( fs "io/fs" - io "io" - mock "github.com/stretchr/testify/mock" node "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" @@ -65,20 +63,6 @@ func (_m *Tree) Delete(ctx context.Context, _a1 *node.Node) error { return r0 } -// DeleteBlob provides a mock function with given fields: _a0 -func (_m *Tree) DeleteBlob(_a0 *node.Node) error { - ret := _m.Called(_a0) - - var r0 error - if rf, ok := ret.Get(0).(func(*node.Node) error); ok { - r0 = rf(_a0) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // GetMD provides a mock function with given fields: ctx, _a1 func (_m *Tree) GetMD(ctx context.Context, _a1 *node.Node) (fs.FileInfo, error) { ret := _m.Called(ctx, _a1) @@ -194,32 +178,6 @@ func (_m *Tree) PurgeRecycleItemFunc(ctx context.Context, spaceid string, key st return r0, r1, r2 } -// ReadBlob provides a mock function with given fields: _a0 -func (_m *Tree) ReadBlob(_a0 *node.Node) (io.ReadCloser, error) { - ret := _m.Called(_a0) - - var r0 io.ReadCloser - var r1 error - if rf, ok := ret.Get(0).(func(*node.Node) (io.ReadCloser, error)); ok { - return rf(_a0) - } - if rf, ok := ret.Get(0).(func(*node.Node) io.ReadCloser); ok { - r0 = rf(_a0) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(io.ReadCloser) - } - } - - if rf, ok := ret.Get(1).(func(*node.Node) error); ok { - r1 = rf(_a0) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // RestoreRecycleItemFunc provides a mock function with given fields: ctx, spaceid, key, trashPath, target func (_m *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid string, key string, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error) { ret := _m.Called(ctx, spaceid, key, trashPath, target) @@ -292,20 +250,6 @@ func (_m *Tree) TouchFile(ctx context.Context, _a1 *node.Node, markprocessing bo return r0 } -// WriteBlob provides a mock function with given fields: _a0, source -func (_m *Tree) WriteBlob(_a0 *node.Node, source string) error { - ret := _m.Called(_a0, source) - - var r0 error - if rf, ok := ret.Get(0).(func(*node.Node, string) error); ok { - r0 = rf(_a0, source) - } else { - r0 = ret.Error(0) - } - - return r0 -} - type mockConstructorTestingTNewTree interface { mock.TestingT Cleanup(func()) diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 3eb29ea4d5..bcf507e6cd 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -114,6 +114,7 @@ func New(spaceID, id, parentID, name string, blobsize int64, blobID string, t pr if blobID == "" { blobID = uuid.New().String() } + // hm but dirs have no blob id return &Node{ SpaceID: spaceID, ID: id, @@ -127,6 +128,35 @@ func New(spaceID, id, parentID, name string, blobsize int64, blobID string, t pr } } +func (n *Node) ReadRevision(ctx context.Context, revision string) (*Node, error) { + + rn := &Node{ + SpaceID: n.SpaceID, + ID: n.ID + RevisionIDDelimiter + revision, + ParentID: n.ParentID, + Name: n.Name, + owner: n.owner, + lu: n.lu, + nodeType: n.nodeType, + } + attrs, err := rn.Xattrs(ctx) + switch { + case metadata.IsNotExist(err): + return rn, nil // swallow not found, the node defaults to exists = false + case err != nil: + return nil, err + } + rn.Exists = true + + rn.BlobID = attrs.String(prefixes.BlobIDAttr) + rn.Blobsize, err = attrs.Int64(prefixes.BlobsizeAttr) + if err != nil { + return nil, err + } + + return rn, nil +} + // Type returns the node's resource type func (n *Node) Type(ctx context.Context) provider.ResourceType { if n.nodeType != nil { @@ -895,7 +925,7 @@ func (n *Node) GetTMTime(ctx context.Context) (time.Time, error) { // GetMTime reads the mtime from the extended attributes, falling back to disk func (n *Node) GetMTime(ctx context.Context) (time.Time, error) { b, err := n.XattrString(ctx, prefixes.MTimeAttr) - if err != nil { + if err != nil || len(b) == 0 { fi, err := os.Lstat(n.InternalPath()) if err != nil { return time.Time{}, err diff --git a/pkg/storage/utils/decomposedfs/revisions.go b/pkg/storage/utils/decomposedfs/revisions.go index e078d78bac..043ae198d3 100644 --- a/pkg/storage/utils/decomposedfs/revisions.go +++ b/pkg/storage/utils/decomposedfs/revisions.go @@ -69,9 +69,11 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen revisions = []*provider.FileVersion{} np := n.InternalPath() + mtime, err := n.GetMTime(ctx) + currentRevisionPath := np + node.RevisionIDDelimiter + mtime.UTC().Format(time.RFC3339Nano) if items, err := filepath.Glob(np + node.RevisionIDDelimiter + "*"); err == nil { for i := range items { - if fs.lu.MetadataBackend().IsMetaFile(items[i]) || strings.HasSuffix(items[i], ".mlock") { + if fs.lu.MetadataBackend().IsMetaFile(items[i]) || strings.HasSuffix(items[i], ".mlock") || items[i] == currentRevisionPath { continue } @@ -159,7 +161,7 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe revisionNode := node.Node{SpaceID: spaceID, BlobID: blobid, Blobsize: blobsize} // blobsize is needed for the s3ng blobstore - reader, err := fs.tp.ReadBlob(&revisionNode) + reader, err := fs.blobstore.Download(&revisionNode) if err != nil { return nil, errors.Wrapf(err, "Decomposedfs: could not download blob of revision '%s' for node '%s'", n.ID, revisionKey) } @@ -315,7 +317,7 @@ func (fs *Decomposedfs) DeleteRevision(ctx context.Context, ref *provider.Refere return err } - return fs.tp.DeleteBlob(n) + return fs.blobstore.Delete(n) } func (fs *Decomposedfs) getRevisionNode(ctx context.Context, ref *provider.Reference, revisionKey string, hasPermission func(*provider.ResourcePermissions) bool) (*node.Node, error) { diff --git a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go index 78ceaaa493..10d6f61a7d 100644 --- a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go +++ b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go @@ -32,6 +32,7 @@ import ( "github.com/cs3org/reva/v2/pkg/store" "github.com/google/uuid" "github.com/stretchr/testify/mock" + "github.com/tus/tusd/pkg/filestore" "google.golang.org/grpc" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" @@ -46,6 +47,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree" treemocks "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/mocks" "github.com/cs3org/reva/v2/tests/helpers" + tusHandler "github.com/tus/tusd/pkg/handler" ) // TestEnv represents a test environment for unit tests @@ -55,6 +57,7 @@ type TestEnv struct { Tree *tree.Tree Permissions *mocks.PermissionsChecker Blobstore *treemocks.Blobstore + DataStore tusHandler.DataStore Owner *userpb.User DeleteAllSpacesUser *userpb.User DeleteHomeSpacesUser *userpb.User @@ -89,6 +92,7 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) { if err != nil { return nil, err } + dataStore := filestore.New(filepath.Join(tmpRoot, "uploads")) defaultConfig := map[string]interface{}{ "root": tmpRoot, "treetime_accounting": true, @@ -170,7 +174,7 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) { bs := &treemocks.Blobstore{} tree := tree.New(lu, bs, o, store.Create()) - fs, err := decomposedfs.New(o, lu, decomposedfs.NewPermissions(permissions, permissionsSelector), tree, nil) + fs, err := decomposedfs.New(o, lu, decomposedfs.NewPermissions(permissions, permissionsSelector), tree, nil, dataStore, bs) if err != nil { return nil, err } @@ -185,6 +189,7 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) { Lookup: lu, Permissions: permissions, Blobstore: bs, + DataStore: dataStore, Owner: owner, DeleteAllSpacesUser: deleteAllSpacesUser, DeleteHomeSpacesUser: deleteHomeSpacesUser, diff --git a/pkg/storage/utils/decomposedfs/tree/blobstore.go b/pkg/storage/utils/decomposedfs/tree/blobstore.go new file mode 100644 index 0000000000..589ea8c39c --- /dev/null +++ b/pkg/storage/utils/decomposedfs/tree/blobstore.go @@ -0,0 +1,42 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package tree + +import ( + "errors" + "io" + + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" +) + +//go:generate make --no-print-directory -C ../../../../.. mockery NAME=Blobstore + +// Blobstore defines an interface for storing blobs in a blobstore +type Blobstore interface { + Upload(node *node.Node, source string) error + Download(node *node.Node) (io.ReadCloser, error) + Delete(node *node.Node) error +} + +// BlobstoreMover is used to move a file from the upload to the final destination +type BlobstoreMover interface { + MoveBlob(n *node.Node, source, bucket, key string) error +} + +var ErrBlobstoreCannotMove = errors.New("cannot move") diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index e0610ed5d5..c725efa95f 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -19,10 +19,7 @@ package tree import ( - "bytes" "context" - "fmt" - "io" "io/fs" iofs "io/fs" "os" @@ -55,15 +52,6 @@ func init() { tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/tree") } -//go:generate make --no-print-directory -C ../../../../.. mockery NAME=Blobstore - -// Blobstore defines an interface for storing blobs in a blobstore -type Blobstore interface { - Upload(node *node.Node, source string) error - Download(node *node.Node) (io.ReadCloser, error) - Delete(node *node.Node) error -} - // Tree manages a hierarchical tree type Tree struct { lookup lookup.PathLookup @@ -391,7 +379,7 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro child, err := node.ReadNode(ctx, t.lookup, n.SpaceID, nodeID, false, n.SpaceRoot, true) if err != nil { - return err + continue } // prevent listing denied resources @@ -662,7 +650,7 @@ func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error // delete blob from blobstore if n.BlobID != "" { - if err := t.DeleteBlob(n); err != nil { + if err := t.blobstore.Delete(n); err != nil { log.Error().Err(err).Str("blobID", n.BlobID).Msg("error purging nodes blob") return err } @@ -675,7 +663,7 @@ func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error return err } for _, rev := range revs { - if t.lookup.MetadataBackend().IsMetaFile(rev) { + if t.lookup.MetadataBackend().IsMetaFile(rev) || strings.HasSuffix(rev, ".mlock") { continue } @@ -691,7 +679,7 @@ func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error } if bID != "" { - if err := t.DeleteBlob(&node.Node{SpaceID: n.SpaceID, BlobID: bID}); err != nil { + if err := t.blobstore.Delete(&node.Node{SpaceID: n.SpaceID, BlobID: bID}); err != nil { log.Error().Err(err).Str("revision", rev).Str("blobID", bID).Msg("error removing revision node blob") return err } @@ -707,32 +695,6 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err return t.propagator.Propagate(ctx, n, sizeDiff) } -// WriteBlob writes a blob to the blobstore -func (t *Tree) WriteBlob(node *node.Node, source string) error { - return t.blobstore.Upload(node, source) -} - -// ReadBlob reads a blob from the blobstore -func (t *Tree) ReadBlob(node *node.Node) (io.ReadCloser, error) { - if node.BlobID == "" { - // there is no blob yet - we are dealing with a 0 byte file - return io.NopCloser(bytes.NewReader([]byte{})), nil - } - return t.blobstore.Download(node) -} - -// DeleteBlob deletes a blob from the blobstore -func (t *Tree) DeleteBlob(node *node.Node) error { - if node == nil { - return fmt.Errorf("could not delete blob, nil node was given") - } - if node.BlobID == "" { - return fmt.Errorf("could not delete blob, node with empty blob id was given") - } - - return t.blobstore.Delete(node) -} - // TODO check if node exists? func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) { ctx, span := tracer.Start(ctx, "createDirNode") diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 40cb53b073..23997429de 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -20,6 +20,14 @@ package decomposedfs import ( "context" + "crypto/md5" + "crypto/sha1" + "encoding/hex" + "fmt" + "hash" + "hash/adler32" + "io" + "net/url" "os" "path/filepath" "regexp" @@ -27,6 +35,7 @@ import ( "strings" "time" + "github.com/golang-jwt/jwt" tusd "github.com/tus/tusd/pkg/handler" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" @@ -34,214 +43,519 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload" + "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" "github.com/pkg/errors" ) var _idRegexp = regexp.MustCompile(".*/([^/]+).info") -// Upload uploads data to the given resource -// TODO Upload (and InitiateUpload) needs a way to receive the expected checksum. -// Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? -func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, uff storage.UploadFinishedFunc) (provider.ResourceInfo, error) { - up, err := fs.GetUpload(ctx, req.Ref.GetPath()) +// InitiateUpload returns upload ids corresponding to different protocols it supports +// It creates a node for new files to persist the fileid for the new child. +// TODO read optional content for small files in this request +// TODO InitiateUpload (and Upload) needs a way to receive the expected checksum. Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? +// TODO needs a way to handle unknown filesize, currently uses the context +// FIXME metadata is actually used to carry all kinds of headers +func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, headers map[string]string) (map[string]string, error) { + log := appctx.GetLogger(ctx) + + n, err := fs.lu.NodeFromResource(ctx, ref) + switch err.(type) { + case nil: + // ok + case errtypes.IsNotFound: + return nil, errtypes.PreconditionFailed(err.Error()) + default: + return nil, err + } + + // permissions are checked in NewUpload below + + relative, err := fs.lu.Path(ctx, n, node.NoCheck) if err != nil { - return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload") + return nil, err } - uploadInfo := up.(*upload.Upload) + tusMetadata := tusd.MetaData{} - p := uploadInfo.Info.Storage["NodeName"] - if chunking.IsChunked(p) { // check chunking v1 - var assembledFile string - p, assembledFile, err = fs.chunkHandler.WriteChunk(p, req.Body) - if err != nil { - return provider.ResourceInfo{}, err - } - if p == "" { - if err = uploadInfo.Terminate(ctx); err != nil { - return provider.ResourceInfo{}, errors.Wrap(err, "ocfs: error removing auxiliary files") - } - return provider.ResourceInfo{}, errtypes.PartialContent(req.Ref.String()) + // checksum is sent as tus Upload-Checksum header and should not magically become a metadata property + if checksum, ok := headers["checksum"]; ok { + parts := strings.SplitN(checksum, " ", 2) + if len(parts) != 2 { + return nil, errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'") } - uploadInfo.Info.Storage["NodeName"] = p - fd, err := os.Open(assembledFile) - if err != nil { - return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error opening assembled file") + switch parts[0] { + case "sha1", "md5", "adler32": + tusMetadata[tus.CS3Prefix+"checksum"] = checksum + default: + return nil, errtypes.BadRequest("unsupported checksum algorithm: " + parts[0]) } - defer fd.Close() - defer os.RemoveAll(assembledFile) - req.Body = fd } - if _, err := uploadInfo.WriteChunk(ctx, 0, req.Body); err != nil { - return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error writing to binary file") + // if mtime has been set via tus metadata, expose it as tus metadata + if ocmtime, ok := headers["mtime"]; ok { + if ocmtime != "null" { + tusMetadata[tus.TusPrefix+"mtime"] = ocmtime + } } - if err := uploadInfo.FinishUpload(ctx); err != nil { - return provider.ResourceInfo{}, err + _, err = node.CheckQuota(ctx, n.SpaceRoot, n.Exists, uint64(n.Blobsize), uint64(uploadLength)) + if err != nil { + return nil, err } - if uff != nil { - info := uploadInfo.Info - uploadRef := &provider.Reference{ - ResourceId: &provider.ResourceId{ - StorageId: info.MetaData["providerID"], - SpaceId: info.Storage["SpaceRoot"], - OpaqueId: info.Storage["SpaceRoot"], - }, - Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), + // check permissions + var ( + checkNode *node.Node + path string + ) + if n.Exists { + // check permissions of file to be overwritten + checkNode = n + path, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ + SpaceId: checkNode.SpaceID, + OpaqueId: checkNode.ID, + }}) + } else { + // check permissions of parent + parent, perr := n.Parent(ctx) + if perr != nil { + return nil, errors.Wrap(perr, "Decomposedfs: error getting parent "+n.ParentID) } - owner, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx) - if !ok { - return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context") + checkNode = parent + path, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ + SpaceId: checkNode.SpaceID, + OpaqueId: checkNode.ID, + }, Path: n.Name}) + } + rp, err := fs.p.AssemblePermissions(ctx, checkNode) // context does not have a user? + switch { + case err != nil: + return nil, err + case !rp.InitiateFileUpload: + return nil, errtypes.PermissionDenied(path) + } + + // are we trying to overwrite a folder with a file? + if n.Exists && n.IsDir(ctx) { + return nil, errtypes.PreconditionFailed("resource is not a file") + } + + // check lock + // FIXME we cannot check the lock of a new file, because it would have to use the name ... + if err := n.CheckLock(ctx); err != nil { + return nil, err + } + + usr := ctxpkg.ContextMustGetUser(ctx) + + // treat 0 length uploads as deferred + sizeIsDeferred := false + if uploadLength == 0 { + sizeIsDeferred = true + } + + info := tusd.FileInfo{ + MetaData: tusMetadata, + Size: uploadLength, + SizeIsDeferred: sizeIsDeferred, + } + if lockID, ok := ctxpkg.ContextGetLockID(ctx); ok { + info.MetaData[tus.CS3Prefix+"lockid"] = lockID + } + info.MetaData[tus.CS3Prefix+"dir"] = filepath.Dir(relative) + + // rewrite filename for old chunking v1 + if chunking.IsChunked(n.Name) { + info.MetaData[tus.CS3Prefix+"chunk"] = n.Name + bi, err := chunking.GetChunkBLOBInfo(n.Name) + if err != nil { + return nil, err } - spaceOwner := &userpb.UserId{ - OpaqueId: info.Storage["SpaceOwnerOrManager"], + n.Name = bi.Path + } + + info.MetaData[tus.CS3Prefix+"filename"] = n.Name + info.MetaData[tus.CS3Prefix+"SpaceRoot"] = n.SpaceRoot.ID + info.MetaData[tus.CS3Prefix+"SpaceOwnerOrManager"] = n.SpaceOwnerOrManager(ctx).GetOpaqueId() + info.MetaData[tus.CS3Prefix+"providerID"] = headers["providerID"] + + info.MetaData[tus.CS3Prefix+"RevisionTime"] = time.Now().UTC().Format(time.RFC3339Nano) + info.MetaData[tus.CS3Prefix+"NodeId"] = n.ID + info.MetaData[tus.CS3Prefix+"NodeParentId"] = n.ParentID + + info.MetaData[tus.CS3Prefix+"ExecutantIdp"] = usr.Id.Idp + info.MetaData[tus.CS3Prefix+"ExecutantId"] = usr.Id.OpaqueId + info.MetaData[tus.CS3Prefix+"ExecutantType"] = utils.UserTypeToString(usr.Id.Type) + info.MetaData[tus.CS3Prefix+"ExecutantUserName"] = usr.Username + + info.MetaData[tus.CS3Prefix+"LogLevel"] = log.GetLevel().String() + + // expires has been set by the storageprovider, do not expose as metadata. It is sent as a tus Upload-Expires header + if expiration, ok := headers["expires"]; ok { + if expiration != "null" { // TODO this is set by the storageprovider ... it cannot be set by cliensts, so it can never be the string 'null' ... or can it??? + info.MetaData[tus.CS3Prefix+"expires"] = expiration } - uff(spaceOwner, owner.Id, uploadRef) + } + // only check preconditions if they are not empty + // do not expose as metadata + if headers["if-match"] != "" { + info.MetaData[tus.CS3Prefix+"if-match"] = headers["if-match"] // TODO drop? + } + if headers["if-none-match"] != "" { + info.MetaData[tus.CS3Prefix+"if-none-match"] = headers["if-none-match"] + } + if headers["if-unmodified-since"] != "" { + info.MetaData[tus.CS3Prefix+"if-unmodified-since"] = headers["if-unmodified-since"] } - ri := provider.ResourceInfo{ - // fill with at least fileid, mtime and etag - Id: &provider.ResourceId{ - StorageId: uploadInfo.Info.MetaData["providerID"], - SpaceId: uploadInfo.Info.Storage["SpaceRoot"], - OpaqueId: uploadInfo.Info.Storage["NodeId"], - }, - Etag: uploadInfo.Info.MetaData["etag"], + if info.MetaData[tus.CS3Prefix+"if-none-match"] == "*" && n.Exists { + return nil, errtypes.Aborted(fmt.Sprintf("parent %s already has a child %s", n.ID, n.Name)) } - if mtime, err := utils.MTimeToTS(uploadInfo.Info.MetaData["mtime"]); err == nil { - ri.Mtime = &mtime + // create the upload + upload, err := fs.tusDataStore.NewUpload(ctx, info) + if err != nil { + return nil, err } - return ri, nil + info, _ = upload.GetInfo(ctx) + + log.Debug().Interface("node", n).Interface("headers", headers).Msg("Decomposedfs: initiated upload") + + return map[string]string{ + "simple": info.ID, + "tus": info.ID, + }, nil } -// InitiateUpload returns upload ids corresponding to different protocols it supports -// TODO read optional content for small files in this request -// TODO InitiateUpload (and Upload) needs a way to receive the expected checksum. Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? -func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) { +// GetDataStore returns the initialized Datastore +func (fs *Decomposedfs) GetDataStore() tusd.DataStore { + return fs.tusDataStore +} + +// PreFinishResponseCallback is called by the tus datatx, after all bytes have been transferred +func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error { + ctx := context.TODO() + appctx.GetLogger(ctx).Debug().Interface("hook", hook).Msg("got PreFinishResponseCallback") + ctx, span := tracer.Start(ctx, "PreFinishResponseCallback") + defer span.End() + + info := hook.Upload + up, err := fs.tusDataStore.GetUpload(ctx, info.ID) + if err != nil { + return err + } + + // put lockID from upload back into context + if info.MetaData[tus.CS3Prefix+"lockid"] != "" { + ctx = ctxpkg.ContextSetLockID(ctx, info.MetaData[tus.CS3Prefix+"lockid"]) + } + log := appctx.GetLogger(ctx) - n, err := fs.lu.NodeFromResource(ctx, ref) - switch err.(type) { - case nil: - // ok - case errtypes.IsNotFound: - return nil, errtypes.PreconditionFailed(err.Error()) - default: - return nil, err + // calculate the checksum of the written bytes + // they will all be written to the metadata later, so we cannot omit any of them + // TODO only calculate the checksum in sync that was requested to match, the rest could be async ... but the tests currently expect all to be present + // TODO the hashes all implement BinaryMarshaler so we could try to persist the state for resumable upload. we would neet do keep track of the copied bytes ... + + sha1h := sha1.New() + md5h := md5.New() + adler32h := adler32.New() + { + _, subspan := tracer.Start(ctx, "GetReader") + reader, err := up.GetReader(ctx) + subspan.End() + if err != nil { + // we can continue if no oc checksum header is set + log.Info().Err(err).Interface("info", info).Msg("error getting Reader from upload") + } + if readCloser, ok := reader.(io.ReadCloser); ok { + defer readCloser.Close() + } + + r1 := io.TeeReader(reader, sha1h) + r2 := io.TeeReader(r1, md5h) + + _, subspan = tracer.Start(ctx, "io.Copy") + _, err = io.Copy(adler32h, r2) + subspan.End() + if err != nil { + log.Info().Err(err).Msg("error copying checksums") + } } - // permissions are checked in NewUpload below + // compare if they match the sent checksum + // TODO the tus checksum extension would do this on every chunk, but I currently don't see an easy way to pass in the requested checksum. for now we do it in FinishUpload which is also called for chunked uploads + if info.MetaData[tus.CS3Prefix+"checksum"] != "" { + var err error + parts := strings.SplitN(info.MetaData[tus.CS3Prefix+"checksum"], " ", 2) + if len(parts) != 2 { + return errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'") + } + switch parts[0] { + case "sha1": + err = checkHash(parts[1], sha1h) + case "md5": + err = checkHash(parts[1], md5h) + case "adler32": + err = checkHash(parts[1], adler32h) + default: + err = errtypes.BadRequest("unsupported checksum algorithm: " + parts[0]) + } + if err != nil { + if tup, ok := up.(tusd.TerminatableUpload); ok { + terr := tup.Terminate(ctx) + if terr != nil { + log.Error().Err(terr).Msg("failed to terminate upload") + } + } + return err + } + } - relative, err := fs.lu.Path(ctx, n, node.NoCheck) + // update checksums + attrs := node.Attributes{ + prefixes.ChecksumPrefix + "sha1": sha1h.Sum(nil), + prefixes.ChecksumPrefix + "md5": md5h.Sum(nil), + prefixes.ChecksumPrefix + "adler32": adler32h.Sum(nil), + } + + n, err := upload.AddRevisionToNode(ctx, fs.lu, info, attrs) if err != nil { - return nil, err + upload.Cleanup(ctx, fs.lu, n, info, true) + if tup, ok := up.(tusd.TerminatableUpload); ok { + terr := tup.Terminate(ctx) + if terr != nil { + log.Error().Err(terr).Msg("failed to terminate upload") + } + } + return err } - lockID, _ := ctxpkg.ContextGetLockID(ctx) + if fs.stream != nil { + user := &userpb.User{ + Id: &userpb.UserId{ + Type: userpb.UserType(userpb.UserType_value[info.MetaData[tus.CS3Prefix+"ExecutantType"]]), + Idp: info.MetaData[tus.CS3Prefix+"ExecutantIdp"], + OpaqueId: info.MetaData[tus.CS3Prefix+"ExecutantId"], + }, + Username: info.MetaData[tus.CS3Prefix+"ExecutantUserName"], + } + s, err := fs.downloadURL(ctx, info.ID) + if err != nil { + return err + } - info := tusd.FileInfo{ - MetaData: tusd.MetaData{ - "filename": filepath.Base(relative), - "dir": filepath.Dir(relative), - "lockid": lockID, - }, - Size: uploadLength, - Storage: map[string]string{ - "SpaceRoot": n.SpaceRoot.ID, - "SpaceOwnerOrManager": n.SpaceOwnerOrManager(ctx).GetOpaqueId(), - }, + if err := events.Publish(ctx, fs.stream, events.BytesReceived{ + UploadID: info.ID, + URL: s, + SpaceOwner: n.SpaceOwnerOrManager(ctx), + ExecutingUser: user, + ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}, + Filename: info.MetaData[tus.CS3Prefix+"filename"], + Filesize: uint64(info.Size), + }); err != nil { + return err + } } - if metadata != nil { - info.MetaData["providerID"] = metadata["providerID"] - if mtime, ok := metadata["mtime"]; ok { - if mtime != "null" { - info.MetaData["mtime"] = mtime + sizeDiff := info.Size - n.Blobsize + if !fs.o.AsyncFileUploads { + // handle postprocessing synchronously + err = upload.Finalize(ctx, fs.blobstore, info, n) // moving or copying the blob only reads the blobid, no need to change the revision nodes nodeid + upload.Cleanup(ctx, fs.lu, n, info, err != nil) + if tup, ok := up.(tusd.TerminatableUpload); ok { + terr := tup.Terminate(ctx) + if terr != nil { + log.Error().Err(terr).Msg("failed to terminate upload") } } - if expiration, ok := metadata["expires"]; ok { - if expiration != "null" { - info.MetaData["expires"] = expiration - } + if err != nil { + log.Error().Err(err).Msg("failed to upload") + return err } - if _, ok := metadata["sizedeferred"]; ok { - info.SizeIsDeferred = true + sizeDiff, err = upload.SetNodeToRevision(ctx, fs.lu, n, info.MetaData[tus.CS3Prefix+"RevisionTime"]) + if err != nil { + log.Error().Err(err).Msg("failed update Node to revision") + return err } - if checksum, ok := metadata["checksum"]; ok { - parts := strings.SplitN(checksum, " ", 2) - if len(parts) != 2 { - return nil, errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'") - } - switch parts[0] { - case "sha1", "md5", "adler32": - info.MetaData["checksum"] = checksum - default: - return nil, errtypes.BadRequest("unsupported checksum algorithm: " + parts[0]) - } + } + + return fs.tp.Propagate(ctx, n, sizeDiff) +} + +// URL returns a url to download an upload +func (fs *Decomposedfs) downloadURL(_ context.Context, id string) (string, error) { + type transferClaims struct { + jwt.StandardClaims + Target string `json:"target"` + } + + u, err := url.JoinPath(fs.o.Tokens.DownloadEndpoint, "tus/", id) + if err != nil { + return "", errors.Wrapf(err, "error joinging URL path") + } + ttl := time.Duration(fs.o.Tokens.TransferExpires) * time.Second + claims := transferClaims{ + StandardClaims: jwt.StandardClaims{ + ExpiresAt: time.Now().Add(ttl).Unix(), + Audience: "reva", + IssuedAt: time.Now().Unix(), + }, + Target: u, + } + + t := jwt.NewWithClaims(jwt.GetSigningMethod("HS256"), claims) + + tkn, err := t.SignedString([]byte(fs.o.Tokens.TransferSharedSecret)) + if err != nil { + return "", errors.Wrapf(err, "error signing token with claims %+v", claims) + } + + return url.JoinPath(fs.o.Tokens.DataGatewayEndpoint, tkn) +} + +func checkHash(expected string, h hash.Hash) error { + if expected != hex.EncodeToString(h.Sum(nil)) { + return errtypes.ChecksumMismatch(fmt.Sprintf("invalid checksum: expected %s got %x", expected, h.Sum(nil))) + } + return nil +} + +// Upload uploads data to the given resource +// is used by the simple datatx, after an InitiateUpload call +// TODO Upload (and InitiateUpload) needs a way to receive the expected checksum. +// Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? +func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, uff storage.UploadFinishedFunc) (provider.ResourceInfo, error) { + up, err := fs.tusDataStore.GetUpload(ctx, req.Ref.GetPath()) + if err != nil { + return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload") + } + + uploadInfo, _ := up.GetInfo(ctx) + + p := uploadInfo.MetaData[tus.CS3Prefix+"chunk"] + if chunking.IsChunked(p) { // check chunking v1 + var assembledFile string + p, assembledFile, err = fs.chunkHandler.WriteChunk(p, req.Body) + if err != nil { + return provider.ResourceInfo{}, err + } + if p == "" { + return provider.ResourceInfo{}, errtypes.PartialContent(req.Ref.String()) + } + fd, err := os.Open(assembledFile) + if err != nil { + return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error opening assembled file") + } + defer fd.Close() + defer os.RemoveAll(assembledFile) + + chunkStat, err := fd.Stat() + if err != nil { + return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: could not stat assembledFile for legacy chunking") + } + + // fake a new upload with the correct size + newInfo := tusd.FileInfo{ + Size: chunkStat.Size(), + MetaData: uploadInfo.MetaData, + } + nup, err := fs.tusDataStore.NewUpload(ctx, newInfo) + if err != nil { + return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: could not create new tus upload for legacy chunking") } - // only check preconditions if they are not empty // TODO or is this a bad request? - if metadata["if-match"] != "" { - info.MetaData["if-match"] = metadata["if-match"] + _, err = nup.WriteChunk(ctx, 0, fd) + if err != nil { + return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error writing to binary file for legacy chunking") + } + // use new upload and info + up = nup + uploadInfo, err = up.GetInfo(ctx) + if err != nil { + return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: could not get info for legacy chunking") } - if metadata["if-none-match"] != "" { - info.MetaData["if-none-match"] = metadata["if-none-match"] + } else { + bytesWritten, err := up.WriteChunk(ctx, 0, req.Body) + if err != nil { + return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error writing to binary file") } - if metadata["if-unmodified-since"] != "" { - info.MetaData["if-unmodified-since"] = metadata["if-unmodified-since"] + if uploadInfo.SizeIsDeferred { + // update the size + uploadInfo.Size = bytesWritten } } - log.Debug().Interface("info", info).Interface("node", n).Interface("metadata", metadata).Msg("Decomposedfs: resolved filename") + // This finishes the tus upload + if err := up.FinishUpload(ctx); err != nil { + return provider.ResourceInfo{}, err + } - _, err = node.CheckQuota(ctx, n.SpaceRoot, n.Exists, uint64(n.Blobsize), uint64(info.Size)) + // we now need to handle to move/copy&delete to the target blobstore + err = fs.PreFinishResponseCallback(tusd.HookEvent{Upload: uploadInfo}) if err != nil { - return nil, err + return provider.ResourceInfo{}, err } - upload, err := fs.NewUpload(ctx, info) + n, err := upload.ReadNode(ctx, fs.lu, uploadInfo) if err != nil { - return nil, err + return provider.ResourceInfo{}, err } - info, _ = upload.GetInfo(ctx) + if uff != nil { + // TODO search needs to index the full path, so we return a reference relative to the space root. + // but then the search has to walk the path. it might be more efficient if search called GetPath itself ... or we send the path as additional metadata in the event + uploadRef := &provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: uploadInfo.MetaData[tus.CS3Prefix+"providerID"], + SpaceId: n.SpaceID, + OpaqueId: n.SpaceID, + }, + Path: utils.MakeRelativePath(filepath.Join(uploadInfo.MetaData[tus.CS3Prefix+"dir"], uploadInfo.MetaData[tus.CS3Prefix+"filename"])), + } + excutant, ok := ctxpkg.ContextGetUser(ctx) + if !ok { + return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from context") + } - return map[string]string{ - "simple": info.ID, - "tus": info.ID, - }, nil -} + uff(n.SpaceOwnerOrManager(ctx), excutant.Id, uploadRef) + } -// UseIn tells the tus upload middleware which extensions it supports. -func (fs *Decomposedfs) UseIn(composer *tusd.StoreComposer) { - composer.UseCore(fs) - composer.UseTerminater(fs) - composer.UseConcater(fs) - composer.UseLengthDeferrer(fs) -} + mtime, err := n.GetMTime(ctx) + if err != nil { + return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error getting mtime for '"+n.ID+"'") + } + etag, err := node.CalculateEtag(n, mtime) + if err != nil { + return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error calculating etag '"+n.ID+"'") + } + ri := provider.ResourceInfo{ + // fill with at least fileid, mtime and etag + Id: &provider.ResourceId{ + StorageId: uploadInfo.MetaData[tus.CS3Prefix+"providerID"], + SpaceId: n.SpaceID, + OpaqueId: n.ID, + }, + Etag: etag, + } -// To implement the core tus.io protocol as specified in https://tus.io/protocols/resumable-upload.html#core-protocol -// - the storage needs to implement NewUpload and GetUpload -// - the upload needs to implement the tusd.Upload interface: WriteChunk, GetInfo, GetReader and FinishUpload + if mtime, err := utils.MTimeToTS(uploadInfo.MetaData[tus.TusPrefix+"mtime"]); err == nil { + ri.Mtime = &mtime + } -// NewUpload returns a new tus Upload instance -func (fs *Decomposedfs) NewUpload(ctx context.Context, info tusd.FileInfo) (tusd.Upload, error) { - return upload.New(ctx, info, fs.lu, fs.tp, fs.p, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) + return ri, nil } -// GetUpload returns the Upload for the given upload id -func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) { - return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) -} +// FIXME all the below functions should needs a dedicated package ... the tusd datastore interface has no way of listing uploads, so we need to extend them // ListUploads returns a list of all incomplete uploads func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) { @@ -256,13 +570,13 @@ func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) err } for _, info := range infos { - expires, err := strconv.Atoi(info.MetaData["expires"]) + expires, err := strconv.Atoi(info.MetaData[tus.CS3Prefix+"expires"]) if err != nil { continue } if int64(expires) < time.Now().Unix() { purgedChan <- info - err = os.Remove(info.Storage["BinPath"]) + err = os.Remove(info.Storage["BinPath"]) // FIXME if err != nil { return err } @@ -275,30 +589,9 @@ func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) err return nil } -// AsTerminatableUpload returns a TerminatableUpload -// To implement the termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination -// the storage needs to implement AsTerminatableUpload -func (fs *Decomposedfs) AsTerminatableUpload(up tusd.Upload) tusd.TerminatableUpload { - return up.(*upload.Upload) -} - -// AsLengthDeclarableUpload returns a LengthDeclarableUpload -// To implement the creation-defer-length extension as specified in https://tus.io/protocols/resumable-upload.html#creation -// the storage needs to implement AsLengthDeclarableUpload -func (fs *Decomposedfs) AsLengthDeclarableUpload(up tusd.Upload) tusd.LengthDeclarableUpload { - return up.(*upload.Upload) -} - -// AsConcatableUpload returns a ConcatableUpload -// To implement the concatenation extension as specified in https://tus.io/protocols/resumable-upload.html#concatenation -// the storage needs to implement AsConcatableUpload -func (fs *Decomposedfs) AsConcatableUpload(up tusd.Upload) tusd.ConcatableUpload { - return up.(*upload.Upload) -} - func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]tusd.FileInfo, error) { infos := []tusd.FileInfo{} - infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info")) + infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info")) // FIXME if err != nil { return nil, err } @@ -308,7 +601,7 @@ func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]tusd.FileInfo, error if match == nil || len(match) < 2 { continue } - up, err := fs.GetUpload(ctx, match[1]) + up, err := fs.tusDataStore.GetUpload(ctx, match[1]) if err != nil { return nil, err } diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index d70f7f44a8..3b95f651f3 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -20,476 +20,334 @@ package upload import ( "context" - "encoding/json" - "fmt" - iofs "io/fs" - "os" "path/filepath" - "strconv" - "strings" "time" - userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/cs3org/reva/v2/pkg/appctx" - ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" - "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/logger" - "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" + "github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus" + "github.com/cs3org/reva/v2/pkg/storage/cache" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" - "github.com/cs3org/reva/v2/pkg/storagespace" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree" "github.com/cs3org/reva/v2/pkg/utils" - "github.com/google/uuid" - "github.com/pkg/errors" - "github.com/rogpeppe/go-internal/lockedfile" tusd "github.com/tus/tusd/pkg/handler" ) -var defaultFilePerm = os.FileMode(0664) - // PermissionsChecker defines an interface for checking permissions on a Node type PermissionsChecker interface { AssemblePermissions(ctx context.Context, n *node.Node) (ap provider.ResourcePermissions, err error) } -// New returns a new processing instance -func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p PermissionsChecker, fsRoot string, pub events.Publisher, async bool, tknopts options.TokenOptions) (upload *Upload, err error) { - - log := appctx.GetLogger(ctx) - log.Debug().Interface("info", info).Msg("Decomposedfs: NewUpload") - - if info.MetaData["filename"] == "" { - return nil, errors.New("Decomposedfs: missing filename in metadata") - } - if info.MetaData["dir"] == "" { - return nil, errors.New("Decomposedfs: missing dir in metadata") - } - - n, err := lu.NodeFromSpaceID(ctx, info.Storage["SpaceRoot"]) - if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: error getting space root node") - } - - n, err = lookupNode(ctx, n, filepath.Join(info.MetaData["dir"], info.MetaData["filename"]), lu) - if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: error walking path") - } - - log.Debug().Interface("info", info).Interface("node", n).Msg("Decomposedfs: resolved filename") - - // the parent owner will become the new owner - parent, perr := n.Parent(ctx) - if perr != nil { - return nil, errors.Wrap(perr, "Decomposedfs: error getting parent "+n.ParentID) - } - - // check permissions - var ( - checkNode *node.Node - path string - ) - if n.Exists { - // check permissions of file to be overwritten - checkNode = n - path, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ - SpaceId: checkNode.SpaceID, - OpaqueId: checkNode.ID, - }}) - } else { - // check permissions of parent - checkNode = parent - path, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ - SpaceId: checkNode.SpaceID, - OpaqueId: checkNode.ID, - }, Path: n.Name}) - } - rp, err := p.AssemblePermissions(ctx, checkNode) - switch { - case err != nil: - return nil, err - case !rp.InitiateFileUpload: - return nil, errtypes.PermissionDenied(path) - } - - // are we trying to overwriting a folder with a file? - if n.Exists && n.IsDir(ctx) { - return nil, errtypes.PreconditionFailed("resource is not a file") - } - - // check lock - if info.MetaData["lockid"] != "" { - ctx = ctxpkg.ContextSetLockID(ctx, info.MetaData["lockid"]) - } - if err := n.CheckLock(ctx); err != nil { - return nil, err - } - - info.ID = uuid.New().String() - - binPath := filepath.Join(fsRoot, "uploads", info.ID) - usr := ctxpkg.ContextMustGetUser(ctx) - - var ( - spaceRoot string - ok bool - ) - if info.Storage != nil { - if spaceRoot, ok = info.Storage["SpaceRoot"]; !ok { - spaceRoot = n.SpaceRoot.ID - } - } else { - spaceRoot = n.SpaceRoot.ID - } - - info.Storage = map[string]string{ - "Type": "OCISStore", - "BinPath": binPath, - - "NodeId": n.ID, - "NodeExists": "true", - "NodeParentId": n.ParentID, - "NodeName": n.Name, - "SpaceRoot": spaceRoot, - "SpaceOwnerOrManager": info.Storage["SpaceOwnerOrManager"], - - "Idp": usr.Id.Idp, - "UserId": usr.Id.OpaqueId, - "UserType": utils.UserTypeToString(usr.Id.Type), - "UserName": usr.Username, - - "LogLevel": log.GetLevel().String(), - } - if !n.Exists { - // fill future node info - info.Storage["NodeId"] = uuid.New().String() - info.Storage["NodeExists"] = "false" - } - if info.MetaData["if-none-match"] == "*" && info.Storage["NodeExists"] == "true" { - return nil, errtypes.Aborted(fmt.Sprintf("parent %s already has a child %s", n.ID, n.Name)) - } - // Create binary file in the upload folder with no content - log.Debug().Interface("info", info).Msg("Decomposedfs: built storage info") - file, err := os.OpenFile(binPath, os.O_CREATE|os.O_WRONLY, defaultFilePerm) - if err != nil { - return nil, err - } - defer file.Close() - - u := buildUpload(ctx, info, binPath, filepath.Join(fsRoot, "uploads", info.ID+".info"), lu, tp, pub, async, tknopts) - - // writeInfo creates the file by itself if necessary - err = u.writeInfo() - if err != nil { - return nil, err - } - - return u, nil -} - -// Get returns the Upload for the given upload id -func Get(ctx context.Context, id string, lu *lookup.Lookup, tp Tree, fsRoot string, pub events.Publisher, async bool, tknopts options.TokenOptions) (*Upload, error) { - infoPath := filepath.Join(fsRoot, "uploads", id+".info") - - info := tusd.FileInfo{} - data, err := os.ReadFile(infoPath) - if err != nil { - if errors.Is(err, iofs.ErrNotExist) { - // Interpret os.ErrNotExist as 404 Not Found - err = tusd.ErrNotFound - } - return nil, err - } - if err := json.Unmarshal(data, &info); err != nil { - return nil, err - } - - stat, err := os.Stat(info.Storage["BinPath"]) - if err != nil { - return nil, err - } - - info.Offset = stat.Size() - - u := &userpb.User{ - Id: &userpb.UserId{ - Idp: info.Storage["Idp"], - OpaqueId: info.Storage["UserId"], - Type: utils.UserTypeMap(info.Storage["UserType"]), - }, - Username: info.Storage["UserName"], - } - - ctx = ctxpkg.ContextSetUser(ctx, u) - // TODO configure the logger the same way ... store and add traceid in file info - - var opts []logger.Option - opts = append(opts, logger.WithLevel(info.Storage["LogLevel"])) - opts = append(opts, logger.WithWriter(os.Stderr, logger.ConsoleMode)) - l := logger.New(opts...) - - sub := l.With().Int("pid", os.Getpid()).Logger() - - ctx = appctx.WithLogger(ctx, &sub) - - up := buildUpload(ctx, info, info.Storage["BinPath"], infoPath, lu, tp, pub, async, tknopts) - up.versionsPath = info.MetaData["versionsPath"] - up.SizeDiff, _ = strconv.ParseInt(info.MetaData["sizeDiff"], 10, 64) - return up, nil +type Propagator interface { + Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error) } -// CreateNodeForUpload will create the target node for the Upload -func CreateNodeForUpload(upload *Upload, initAttrs node.Attributes) (*node.Node, error) { - ctx, span := tracer.Start(upload.Ctx, "CreateNodeForUpload") +// Postprocessing starts the postprocessing result collector +func Postprocessing(lu *lookup.Lookup, propagator Propagator, cache cache.StatCache, es events.Stream, tusDataStore tusd.DataStore, blobstore tree.Blobstore, downloadURLfunc func(ctx context.Context, id string) (string, error), ch <-chan events.Event) { + ctx := context.TODO() // we should pass the trace id in the event and initialize the trace provider here + ctx, span := tracer.Start(ctx, "Postprocessing") defer span.End() - _, subspan := tracer.Start(ctx, "os.Stat") - fi, err := os.Stat(upload.binPath) - subspan.End() - if err != nil { - return nil, err - } - - fsize := fi.Size() - spaceID := upload.Info.Storage["SpaceRoot"] - n := node.New( - spaceID, - upload.Info.Storage["NodeId"], - upload.Info.Storage["NodeParentId"], - upload.Info.Storage["NodeName"], - fsize, - upload.Info.ID, - provider.ResourceType_RESOURCE_TYPE_FILE, - nil, - upload.lu, - ) - n.SpaceRoot, err = node.ReadNode(ctx, upload.lu, spaceID, spaceID, false, nil, false) - if err != nil { - return nil, err - } - - // check lock - if err := n.CheckLock(ctx); err != nil { - return nil, err - } - - var f *lockedfile.File - switch upload.Info.Storage["NodeExists"] { - case "false": - f, err = initNewNode(upload, n, uint64(fsize)) - if f != nil { - appctx.GetLogger(upload.Ctx).Info().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from initNewNode") - } - default: - f, err = updateExistingNode(upload, n, spaceID, uint64(fsize)) - if f != nil { - appctx.GetLogger(upload.Ctx).Info().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from updateExistingNode") - } - } - defer func() { - if f == nil { - return - } - if err := f.Close(); err != nil { - appctx.GetLogger(upload.Ctx).Error().Err(err).Str("nodeid", n.ID).Str("parentid", n.ParentID).Msg("could not close lock") - } - }() - if err != nil { - return nil, err - } - - mtime := time.Now() - if upload.Info.MetaData["mtime"] != "" { - // overwrite mtime if requested - mtime, err = utils.MTimeToTime(upload.Info.MetaData["mtime"]) - if err != nil { - return nil, err - } - } - - // overwrite technical information - initAttrs.SetString(prefixes.MTimeAttr, mtime.UTC().Format(time.RFC3339Nano)) - initAttrs.SetInt64(prefixes.TypeAttr, int64(provider.ResourceType_RESOURCE_TYPE_FILE)) - initAttrs.SetString(prefixes.ParentidAttr, n.ParentID) - initAttrs.SetString(prefixes.NameAttr, n.Name) - initAttrs.SetString(prefixes.BlobIDAttr, n.BlobID) - initAttrs.SetInt64(prefixes.BlobsizeAttr, n.Blobsize) - initAttrs.SetString(prefixes.StatusPrefix, node.ProcessingStatus+upload.Info.ID) - - // update node metadata with new blobid etc - err = n.SetXattrsWithContext(ctx, initAttrs, false) - if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: could not write metadata") - } - - // add etag to metadata - upload.Info.MetaData["etag"], _ = node.CalculateEtag(n, mtime) - - // update nodeid for later - upload.Info.Storage["NodeId"] = n.ID - if err := upload.writeInfo(); err != nil { - return nil, err - } - - return n, nil -} - -func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*lockedfile.File, error) { - // create folder structure (if needed) - if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { - return nil, err - } - - // create and write lock new node metadata - f, err := lockedfile.OpenFile(upload.lu.MetadataBackend().LockfilePath(n.InternalPath()), os.O_RDWR|os.O_CREATE, 0600) - if err != nil { - return nil, err - } - - // we also need to touch the actual node file here it stores the mtime of the resource - h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600) - if err != nil { - return f, err - } - h.Close() - - if _, err := node.CheckQuota(upload.Ctx, n.SpaceRoot, false, 0, fsize); err != nil { - return f, err - } - - // link child name to parent if it is new - childNameLink := filepath.Join(n.ParentPath(), n.Name) - relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) - log := appctx.GetLogger(upload.Ctx).With().Str("childNameLink", childNameLink).Str("relativeNodePath", relativeNodePath).Logger() - log.Info().Msg("initNewNode: creating symlink") - - if err = os.Symlink(relativeNodePath, childNameLink); err != nil { - log.Info().Err(err).Msg("initNewNode: symlink failed") - if errors.Is(err, iofs.ErrExist) { - log.Info().Err(err).Msg("initNewNode: symlink already exists") - return f, errtypes.AlreadyExists(n.Name) - } - return f, errors.Wrap(err, "Decomposedfs: could not symlink child entry") - } - log.Info().Msg("initNewNode: symlink created") - - // on a new file the sizeDiff is the fileSize - upload.SizeDiff = int64(fsize) - upload.Info.MetaData["sizeDiff"] = strconv.Itoa(int(upload.SizeDiff)) - return f, nil -} - -func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) (*lockedfile.File, error) { - targetPath := n.InternalPath() - - // write lock existing node before reading any metadata - f, err := lockedfile.OpenFile(upload.lu.MetadataBackend().LockfilePath(targetPath), os.O_RDWR|os.O_CREATE, 0600) - if err != nil { - return nil, err - } + log := logger.New() + for event := range ch { + switch ev := event.Event.(type) { + case events.PostprocessingFinished: + up, err := tusDataStore.GetUpload(ctx, ev.UploadID) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + continue // NOTE: since we can't get the upload, we can't delete the blob + } + info, err := up.GetInfo(ctx) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload info") + continue // NOTE: since we can't get the upload, we can't delete the blob + } - old, _ := node.ReadNode(upload.Ctx, upload.lu, spaceID, n.ID, false, nil, false) - if _, err := node.CheckQuota(upload.Ctx, n.SpaceRoot, true, uint64(old.Blobsize), fsize); err != nil { - return f, err - } + var ( + failed bool + keepUpload bool + ) + + var sizeDiff int64 + // propagate sizeDiff after failed postprocessing + + n, err := ReadNode(ctx, lu, info) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID). + Str("space", info.MetaData[tus.CS3Prefix+"SpaceRoot"]). + Str("parent", info.MetaData[tus.CS3Prefix+"NodeParentId"]). + Str("node", info.MetaData[tus.CS3Prefix+"NodeId"]). + Str("revision", info.MetaData[tus.CS3Prefix+"RevisionTime"]). + Str("name", info.MetaData[tus.CS3Prefix+"filename"]). + Msg("could not read revision node") + continue + } - oldNodeMtime, err := old.GetMTime(upload.Ctx) - if err != nil { - return f, err - } - oldNodeEtag, err := node.CalculateEtag(old, oldNodeMtime) - if err != nil { - return f, err - } + switch ev.Outcome { + default: + log.Error().Str("outcome", string(ev.Outcome)).Str("uploadID", ev.UploadID).Msg("unknown postprocessing outcome - aborting") + fallthrough + case events.PPOutcomeAbort: + failed = true + keepUpload = true + case events.PPOutcomeContinue: + if err := Finalize(ctx, blobstore, info, n); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload") + keepUpload = true // should we keep the upload when assembling failed? + failed = true + } + sizeDiff, err = SetNodeToRevision(ctx, lu, n, info.MetaData[tus.CS3Prefix+"RevisionTime"]) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could set node to revision upload") + keepUpload = true // should we keep the upload when assembling failed? + failed = true + } + case events.PPOutcomeDelete: + failed = true + } - // When the if-match header was set we need to check if the - // etag still matches before finishing the upload. - if ifMatch, ok := upload.Info.MetaData["if-match"]; ok { - if ifMatch != oldNodeEtag { - return f, errtypes.Aborted("etag mismatch") - } - } + getParent := func() *node.Node { + p, err := n.Parent(ctx) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read parent") + return nil + } + return p + } - // When the if-none-match header was set we need to check if any of the - // etags matches before finishing the upload. - if ifNoneMatch, ok := upload.Info.MetaData["if-none-match"]; ok { - if ifNoneMatch == "*" { - return f, errtypes.Aborted("etag mismatch, resource exists") - } - for _, ifNoneMatchTag := range strings.Split(ifNoneMatch, ",") { - if ifNoneMatchTag == oldNodeEtag { - return f, errtypes.Aborted("etag mismatch") + now := time.Now() + if failed { + // propagate sizeDiff after failed postprocessing + if err := propagator.Propagate(ctx, n, -sizeDiff); err != nil { // FIXME revert sizediff .,.. and write an issue that condemns this + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change") + } + + } else if p := getParent(); p != nil { + // update parent tmtime to propagate etag change after successful postprocessing + _ = p.SetTMTime(ctx, &now) + if err := propagator.Propagate(ctx, p, 0); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate etag change") + } } - } - } - // When the if-unmodified-since header was set we need to check if the - // etag still matches before finishing the upload. - if ifUnmodifiedSince, ok := upload.Info.MetaData["if-unmodified-since"]; ok { - if err != nil { - return f, errtypes.InternalError(fmt.Sprintf("failed to read mtime of node: %s", err)) - } - ifUnmodifiedSince, err := time.Parse(time.RFC3339Nano, ifUnmodifiedSince) - if err != nil { - return f, errtypes.InternalError(fmt.Sprintf("failed to parse if-unmodified-since time: %s", err)) - } + Cleanup(ctx, lu, n, info, failed) + if !keepUpload { + if tup, ok := up.(tusd.TerminatableUpload); ok { + terr := tup.Terminate(ctx) + if terr != nil { + log.Error().Err(terr).Msg("failed to terminate upload") + } + } + } - if oldNodeMtime.After(ifUnmodifiedSince) { - return f, errtypes.Aborted("if-unmodified-since mismatch") - } - } + // remove cache entry in gateway + cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + + if err := events.Publish( + ctx, + es, + events.UploadReady{ + UploadID: ev.UploadID, + Failed: failed, + ExecutingUser: &user.User{ + Id: &user.UserId{ + Type: user.UserType(user.UserType_value[info.MetaData[tus.CS3Prefix+"ExecutantType"]]), + Idp: info.MetaData[tus.CS3Prefix+"ExecutantIdp"], + OpaqueId: info.MetaData[tus.CS3Prefix+"ExecutantId"], + }, + Username: info.MetaData[tus.CS3Prefix+"ExecutantUserName"], + }, + Filename: ev.Filename, + FileRef: &provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: info.MetaData[tus.CS3Prefix+"providerID"], + SpaceId: info.MetaData[tus.CS3Prefix+"SpaceRoot"], + OpaqueId: info.MetaData[tus.CS3Prefix+"SpaceRoot"], + }, + // FIXME this seems wrong, path is not really relative to space root + // actually it is: InitiateUpload calls fs.lu.Path to get the path relative to the root so soarch can index the path + // hm is that robust? what if the file is moved? shouldn't we store the parent id, then? + Path: utils.MakeRelativePath(filepath.Join(info.MetaData[tus.CS3Prefix+"dir"], info.MetaData[tus.CS3Prefix+"filename"])), + }, + Timestamp: utils.TimeToTS(now), + SpaceOwner: n.SpaceOwnerOrManager(ctx), + }, + ); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") + } + case events.RestartPostprocessing: + up, err := tusDataStore.GetUpload(ctx, ev.UploadID) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + continue // NOTE: since we can't get the upload, we can't restart postprocessing + } + info, err := up.GetInfo(ctx) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload info") + continue // NOTE: since we can't get the upload, we can't restart postprocessing + } - upload.versionsPath = upload.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+oldNodeMtime.UTC().Format(time.RFC3339Nano)) - upload.SizeDiff = int64(fsize) - old.Blobsize - upload.Info.MetaData["versionsPath"] = upload.versionsPath - upload.Info.MetaData["sizeDiff"] = strconv.Itoa(int(upload.SizeDiff)) + n, err := ReadNode(ctx, lu, info) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID). + Str("space", info.MetaData[tus.CS3Prefix+"SpaceRoot"]). + Str("parent", info.MetaData[tus.CS3Prefix+"NodeParentId"]). + Str("node", info.MetaData[tus.CS3Prefix+"NodeId"]). + Str("revision", info.MetaData[tus.CS3Prefix+"RevisionTime"]). + Str("name", info.MetaData[tus.CS3Prefix+"filename"]). + Msg("could not read revision node") + continue + } - // create version node - if _, err := os.Create(upload.versionsPath); err != nil { - return f, err - } + s, err := downloadURLfunc(ctx, ev.UploadID) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url") + continue + } + // restart postprocessing + if err := events.Publish(ctx, es, events.BytesReceived{ + UploadID: info.ID, + URL: s, + SpaceOwner: n.SpaceOwnerOrManager(ctx), + ExecutingUser: &user.User{Id: &user.UserId{OpaqueId: "postprocessing-restart"}}, // send nil instead? + ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}, + Filename: info.MetaData[tus.CS3Prefix+"filename"], + Filesize: uint64(info.Size), + }); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish BytesReceived event") + } + case events.PostprocessingStepFinished: + if ev.FinishedStep != events.PPStepAntivirus { + // atm we are only interested in antivirus results + continue + } - // copy blob metadata to version node - if err := upload.lu.CopyMetadataWithSourceLock(upload.Ctx, targetPath, upload.versionsPath, func(attributeName string, value []byte) (newValue []byte, copy bool) { - return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) || - attributeName == prefixes.TypeAttr || - attributeName == prefixes.BlobIDAttr || - attributeName == prefixes.BlobsizeAttr || - attributeName == prefixes.MTimeAttr - }, f, true); err != nil { - return f, err - } + res := ev.Result.(events.VirusscanResult) + if res.ErrorMsg != "" { + // scan failed somehow + // Should we handle this here? + continue + } - // keep mtime from previous version - if err := os.Chtimes(upload.versionsPath, oldNodeMtime, oldNodeMtime); err != nil { - return f, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err)) - } + var n *node.Node + switch ev.UploadID { + case "": + // uploadid is empty -> this was an on-demand scan + /* ON DEMAND SCANNING NOT SUPPORTED ATM + ctx := ctxpkg.ContextSetUser(context.Background(), ev.ExecutingUser) + ref := &provider.Reference{ResourceId: ev.ResourceID} + + no, err := fs.lu.NodeFromResource(ctx, ref) + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to get node after scan") + continue + + } + n = no + if ev.Outcome == events.PPOutcomeDelete { + // antivir wants us to delete the file. We must obey and need to + + // check if there a previous versions existing + revs, err := fs.ListRevisions(ctx, ref) + if len(revs) == 0 { + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to list revisions. Fallback to delete file") + } + + // no versions -> trash file + err := fs.Delete(ctx, ref) + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to delete infected resource") + continue + } + + // now purge it from the recycle bin + if err := fs.PurgeRecycleItem(ctx, &provider.Reference{ResourceId: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.SpaceID}}, n.ID, "/"); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to purge infected resource from trash") + } + + // remove cache entry in gateway + fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + continue + } + + // we have versions - find the newest + versions := make(map[uint64]string) // remember all versions - we need them later + var nv uint64 + for _, v := range revs { + versions[v.Mtime] = v.Key + if v.Mtime > nv { + nv = v.Mtime + } + } + + // restore newest version + if err := fs.RestoreRevision(ctx, ref, versions[nv]); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", versions[nv]).Msg("Failed to restore revision") + continue + } + + // now find infected version + revs, err = fs.ListRevisions(ctx, ref) + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Error listing revisions after restore") + } + + for _, v := range revs { + // we looking for a version that was previously not there + if _, ok := versions[v.Mtime]; ok { + continue + } + + if err := fs.DeleteRevision(ctx, ref, v.Key); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", v.Key).Msg("Failed to delete revision") + } + } + + // remove cache entry in gateway + fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + continue + } + */ + default: + // uploadid is not empty -> this is an async upload + up, err := tusDataStore.GetUpload(ctx, ev.UploadID) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + continue + } + info, err := up.GetInfo(ctx) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload info") + continue + } + + // scan data should be set on the node revision not the node ... then when postprocessing finishes we need to copy the state to the node + + n, err = ReadNode(ctx, lu, info) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID). + Str("space", info.MetaData[tus.CS3Prefix+"SpaceRoot"]). + Str("parent", info.MetaData[tus.CS3Prefix+"NodeParentId"]). + Str("node", info.MetaData[tus.CS3Prefix+"NodeId"]). + Str("revision", info.MetaData[tus.CS3Prefix+"RevisionTime"]). + Str("name", info.MetaData[tus.CS3Prefix+"filename"]). + Msg("could not read revision node") + continue + } + } - return f, nil -} + if err := n.SetScanData(ctx, res.Description, res.Scandate); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", res.ResourceID).Msg("Failed to set scan results") + continue + } -// lookupNode looks up nodes by path. -// This method can also handle lookups for paths which contain chunking information. -func lookupNode(ctx context.Context, spaceRoot *node.Node, path string, lu *lookup.Lookup) (*node.Node, error) { - p := path - isChunked := chunking.IsChunked(path) - if isChunked { - chunkInfo, err := chunking.GetChunkBLOBInfo(path) - if err != nil { - return nil, err + // remove cache entry in gateway + cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + default: + log.Error().Interface("event", ev).Msg("Unknown event") } - p = chunkInfo.Path - } - - n, err := lu.WalkPath(ctx, spaceRoot, p, true, func(ctx context.Context, n *node.Node) error { return nil }) - if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: error walking path") - } - - if isChunked { - n.Name = filepath.Base(path) } - return n, nil } diff --git a/pkg/storage/utils/decomposedfs/upload/processing_test.go b/pkg/storage/utils/decomposedfs/upload/processing_test.go index facbf2cd53..53c67325c3 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing_test.go +++ b/pkg/storage/utils/decomposedfs/upload/processing_test.go @@ -33,7 +33,7 @@ func TestInitNewNode(t *testing.T) { } n := node.New("e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "930b7a2e-b745-41e1-8a9b-712582021842", "e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "newchild", 10, "26493c53-2634-45f8-949f-dc07b88df9b0", providerv1beta1.ResourceType_RESOURCE_TYPE_FILE, &userv1beta1.UserId{}, lookup) n.SpaceRoot = rootNode - f, err := initNewNode(&Upload{Ctx: context.Background(), lu: lookup, Info: handler.FileInfo{MetaData: handler.MetaData{}}}, n, 10) + f, err := initNewNode(context.Background(), lookup, handler.FileInfo{MetaData: handler.MetaData{}}, n) if err != nil { t.Fatalf(err.Error()) } @@ -42,7 +42,7 @@ func TestInitNewNode(t *testing.T) { // try initializing the same new node again in case a concurrent requests tries to create a file with the same name n = node.New("e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "a6ede986-cfcd-41c5-a820-6eee955a1c2b", "e48c4e7a-beac-4b82-b991-a5cff7b8c39c", "newchild", 10, "26493c53-2634-45f8-949f-dc07b88df9b0", providerv1beta1.ResourceType_RESOURCE_TYPE_FILE, &userv1beta1.UserId{}, lookup) n.SpaceRoot = rootNode - f2, err := initNewNode(&Upload{Ctx: context.Background(), lu: lookup, Info: handler.FileInfo{MetaData: handler.MetaData{}}}, n, 10) + f2, err := initNewNode(context.Background(), lookup, handler.FileInfo{MetaData: handler.MetaData{}}, n) if _, ok := err.(errtypes.IsAlreadyExists); !ok { t.Fatalf(`initNewNode(with same 'newchild' name), %v, want %v`, err, errtypes.AlreadyExists("newchild")) } diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index 89e4ab6597..7e2678d6f4 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -16,19 +16,58 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. +// Package upload handles the processing of uploads. +// In general this is the lifecycle of an upload from the perspective of a storageprovider: +// 1. To start an upload a client makes a call to InitializeUpload which will return protocols and urls that he can use to append bytes to the upload. +// 2. When the client has sent all bytes the tusd handler will call a PreFinishResponseCallback which marks the end of the transfer and the start of postprocessing. +// 3. When async uploads are enabled the storageprovider emits an BytesReceived event, otherwise a FileUploaded event and the upload lifcycle ends. +// 4. During async postprocessing the uploaded bytes might be read at the upload URL to determine the outcome of the postprocessing steps +// 5. To handle async postprocessing the storageporvider has to listen to multiple events: +// - PostprocessingFinished determines what should happen with the upload: +// - abort - the upload is cancelled but the bytes are kept in the upload folder, eg. when antivirus scanning encounters an error +// then what? can the admin retrigger the upload? +// - continue - the upload is moved to its final destination (eventually being marked with pp results) +// - delete - the file and the upload should be deleted +// - RestartPostprocessing +// - PostprocessingStepFinished is used to set scan data on an upload +// +// 6. The storageprovider emits an UploadReady event that can be used by eg. the search or thumbnails services to do update their metadata. +// +// There are two interesting scenarios: +// 1. Two concurrent requests try to create the same file +// 2. Two concurrent requests try to overwrite the same file +// The first step to upload a file is making an InitiateUpload call to the storageprovider via CS3. It will return an upload id that can be used to append bytes to the upload. +// With an upload id clients can append bytes to the upload. +// When all bytes have been received tusd will call PreFinishResponseCallback on the storageprovider. +// The storageprovider cannot use the tus upload metadata to persist a postprocessing status we have to store the processing status on a revision node. +// On disk the layout for a node consists of the actual node metadata and revision nodes. +// The revision nodes are used to capture the different revsions ... +// * so every uploed always creates a revision node first? +// * and in PreFinishResponseCallback we update or create? the actual node? or do we create the node in the InitiateUpload call? +// * We need to skip unfinished revisions when listing versions? +// The size diff is always calculated when updating the node +// +// ## Client considerations +// When do we propagate the etag? Currently, already when an upload is in postprocessing ... why? because we update the node when all bytes are transferred? +// Does the client expect an etag change when it uploads a file? it should not ... sync and uploads are independent last someone explained it to me +// postprocessing könnte den content ändern und damit das etag +// +// When the client finishes transferring all bytes it gets the 'future' etag of the resource which it currently stores as the etag for the file in its local db. +// When the next propfind happens before postprocessing finishes the client would see the old etag and download the old version. Then, when postprocessing causes +// the next etag change, the client will download the file it previously uploaded. +// +// For the new file scenario, the desktop client would delete the uploaded file locally, when it is not listed in the next propfind. +// +// The graph api exposes pending uploads explicitly using the pendingOperations property, which carries a pendingContentUpdate resource with a +// queuedDateTime property: Date and time the pending binary operation was queued in UTC time. Read-only. +// +// So, until clients learn to keep track of their uploads we need to return 425 when an upload is in progress ಠ_ಠ package upload import ( "context" - "crypto/md5" - "crypto/sha1" - "encoding/hex" - "encoding/json" "fmt" - "hash" - "hash/adler32" - "io" - "io/fs" + iofs "io/fs" "os" "path/filepath" "strings" @@ -38,15 +77,15 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" - "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree" "github.com/cs3org/reva/v2/pkg/utils" - "github.com/golang-jwt/jwt" + "github.com/google/uuid" "github.com/pkg/errors" - "github.com/rs/zerolog" + "github.com/rogpeppe/go-internal/lockedfile" tusd "github.com/tus/tusd/pkg/handler" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" @@ -58,417 +97,497 @@ func init() { tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/upload") } -// Tree is used to manage a tree hierarchy -type Tree interface { - Setup() error - - GetMD(ctx context.Context, node *node.Node) (os.FileInfo, error) - ListFolder(ctx context.Context, node *node.Node) ([]*node.Node, error) - // CreateHome(owner *userpb.UserId) (n *node.Node, err error) - CreateDir(ctx context.Context, node *node.Node) (err error) - // CreateReference(ctx context.Context, node *node.Node, targetURI *url.URL) error - Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) (err error) - Delete(ctx context.Context, node *node.Node) (err error) - RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error) - PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error) - - WriteBlob(node *node.Node, binPath string) error - ReadBlob(node *node.Node) (io.ReadCloser, error) - DeleteBlob(node *node.Node) error - - Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error) -} +// CreateNewRevision will create a new revision node +func CreateNewRevision(ctx context.Context, lu *lookup.Lookup, path string, fsize uint64) (*lockedfile.File, error) { + _, span := tracer.Start(ctx, "CreateNewRevision") + defer span.End() -// Upload processes the upload -// it implements tus tusd.Upload interface https://tus.io/protocols/resumable-upload.html#core-protocol -// it also implements its termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination -// it also implements its creation-defer-length extension as specified in https://tus.io/protocols/resumable-upload.html#creation -// it also implements its concatenation extension as specified in https://tus.io/protocols/resumable-upload.html#concatenation -type Upload struct { - // we use a struct field on the upload as tus pkg will give us an empty context.Background - Ctx context.Context - // info stores the current information about the upload - Info tusd.FileInfo - // node for easy access - Node *node.Node - // SizeDiff size difference between new and old file version - SizeDiff int64 - // infoPath is the path to the .info file - infoPath string - // binPath is the path to the binary file (which has no extension) - binPath string - // lu and tp needed for file operations - lu *lookup.Lookup - tp Tree - // versionsPath will be empty if there was no file before - versionsPath string - // and a logger as well - log zerolog.Logger - // publisher used to publish events - pub events.Publisher - // async determines if uploads shoud be done asynchronously - async bool - // tknopts hold token signing information - tknopts options.TokenOptions -} + // create folder structure (if needed) + if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil { + return nil, err + } -func buildUpload(ctx context.Context, info tusd.FileInfo, binPath string, infoPath string, lu *lookup.Lookup, tp Tree, pub events.Publisher, async bool, tknopts options.TokenOptions) *Upload { - return &Upload{ - Info: info, - binPath: binPath, - infoPath: infoPath, - lu: lu, - tp: tp, - Ctx: ctx, - pub: pub, - async: async, - tknopts: tknopts, - log: appctx.GetLogger(ctx). - With(). - Interface("info", info). - Str("binPath", binPath). - Logger(), + // create and write lock new node metadata by parentid/name + f, err := lockedfile.OpenFile(lu.MetadataBackend().LockfilePath(path), os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return nil, err } + + return f, nil } -// Cleanup cleans the upload -func Cleanup(upload *Upload, failure bool, keepUpload bool) { - ctx, span := tracer.Start(upload.Ctx, "Cleanup") +// CreateNewNode will lock the given node and try to symlink it to the parent +func CreateNewNode(ctx context.Context, lu *lookup.Lookup, n *node.Node, fsize uint64) (*lockedfile.File, error) { + ctx, span := tracer.Start(ctx, "CreateNewNode") defer span.End() - upload.cleanup(failure, !keepUpload, !keepUpload) - // unset processing status - if upload.Node != nil { // node can be nil when there was an error before it was created (eg. checksum-mismatch) - if err := upload.Node.UnmarkProcessing(ctx, upload.Info.ID); err != nil { - upload.log.Info().Str("path", upload.Node.InternalPath()).Err(err).Msg("unmarking processing failed") - } + // create folder structure (if needed) + if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil { + return nil, err } -} -// WriteChunk writes the stream from the reader to the given offset of the upload -func (upload *Upload) WriteChunk(_ context.Context, offset int64, src io.Reader) (int64, error) { - ctx, span := tracer.Start(upload.Ctx, "WriteChunk") - defer span.End() - _, subspan := tracer.Start(ctx, "os.OpenFile") - file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) - subspan.End() + // create and write lock new node metadata by parentid/name + f, err := lockedfile.OpenFile(lu.MetadataBackend().LockfilePath(n.InternalPath()), os.O_RDWR|os.O_CREATE, 0600) if err != nil { - return 0, err + return nil, err } - defer file.Close() - // calculate cheksum here? needed for the TUS checksum extension. https://tus.io/protocols/resumable-upload.html#checksum - // TODO but how do we get the `Upload-Checksum`? WriteChunk() only has a context, offset and the reader ... - // It is sent with the PATCH request, well or in the POST when the creation-with-upload extension is used - // but the tus handler uses a context.Background() so we cannot really check the header and put it in the context ... - _, subspan = tracer.Start(ctx, "io.Copy") - n, err := io.Copy(file, src) - subspan.End() + // we also need to touch the actual node file here it stores the mtime of the resource + h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600) + if err != nil { + return f, err + } + h.Close() - // If the HTTP PATCH request gets interrupted in the middle (e.g. because - // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. - // However, for the ocis driver it's not important whether the stream has ended - // on purpose or accidentally. - if err != nil && err != io.ErrUnexpectedEOF { - return n, err + if _, err := node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize); err != nil { + return f, err } - upload.Info.Offset += n - return n, upload.writeInfo() -} + // link child name to parent if it is new + childNameLink := filepath.Join(n.ParentPath(), n.Name) + relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) + log := appctx.GetLogger(ctx).With().Str("childNameLink", childNameLink).Str("relativeNodePath", relativeNodePath).Logger() + log.Info().Msg("createNewNode: creating symlink") -// GetInfo returns the FileInfo -func (upload *Upload) GetInfo(_ context.Context) (tusd.FileInfo, error) { - return upload.Info, nil -} + if err = os.Symlink(relativeNodePath, childNameLink); err != nil { + log.Info().Err(err).Msg("createNewNode: symlink failed") + if errors.Is(err, iofs.ErrExist) { + log.Info().Err(err).Msg("createNewNode: symlink already exists") -// GetReader returns an io.Reader for the upload -func (upload *Upload) GetReader(_ context.Context) (io.Reader, error) { - _, span := tracer.Start(upload.Ctx, "GetReader") - defer span.End() - return os.Open(upload.binPath) + return f, errtypes.AlreadyExists(n.Name) + } + return f, errors.Wrap(err, "Decomposedfs: could not symlink child entry") + } + log.Info().Msg("createNewNode: symlink created") + + return f, nil } -// FinishUpload finishes an upload and moves the file to the internal destination -func (upload *Upload) FinishUpload(_ context.Context) error { - ctx, span := tracer.Start(upload.Ctx, "FinishUpload") +// AddRevisionToNode will create the target node for the Upload +// TODO this should be CreateRevision +// - if the node does not exist it is created and assigned an id, no blob id? +// - then always write out a revision node +// - when postprocessing finishes copy metadata to node and replace latest revision node with previous blob info. if blobid is empty delete previous revision completely? +func AddRevisionToNode(ctx context.Context, lu *lookup.Lookup, info tusd.FileInfo, attrs node.Attributes) (*node.Node, error) { + ctx, span := tracer.Start(ctx, "AddRevisionToNode") defer span.End() - // set lockID to context - if upload.Info.MetaData["lockid"] != "" { - upload.Ctx = ctxpkg.ContextSetLockID(upload.Ctx, upload.Info.MetaData["lockid"]) - } - - log := appctx.GetLogger(upload.Ctx) - - // calculate the checksum of the written bytes - // they will all be written to the metadata later, so we cannot omit any of them - // TODO only calculate the checksum in sync that was requested to match, the rest could be async ... but the tests currently expect all to be present - // TODO the hashes all implement BinaryMarshaler so we could try to persist the state for resumable upload. we would neet do keep track of the copied bytes ... - sha1h := sha1.New() - md5h := md5.New() - adler32h := adler32.New() - { - _, subspan := tracer.Start(ctx, "os.Open") - f, err := os.Open(upload.binPath) - subspan.End() - if err != nil { - // we can continue if no oc checksum header is set - log.Info().Err(err).Str("binPath", upload.binPath).Msg("error opening binPath") - } - defer f.Close() + log := appctx.GetLogger(ctx).With().Str("uploadID", info.ID).Logger() - r1 := io.TeeReader(f, sha1h) - r2 := io.TeeReader(r1, md5h) + // check lock + if info.MetaData[tus.CS3Prefix+"lockid"] != "" { + ctx = ctxpkg.ContextSetLockID(ctx, info.MetaData[tus.CS3Prefix+"lockid"]) + } + + var err error + + // FIXME should uploads fail if they try to overwrite an existing file? + // but if the webdav overwrite header is set ... two concurrent requests might each create a node with a different id ... -> same problem + // two concurrent requests that would create a new node would return different ids ... + // what if we generate an id based on the parent id and the filename? + // - no, then renaming the file and recreating a node with the provious name would generate the same id + // -> we have to create the node on initialize upload with processing true? - _, subspan = tracer.Start(ctx, "io.Copy") - _, err = io.Copy(adler32h, r2) - subspan.End() + var n *node.Node + var revisionHandle, nodeHandle *lockedfile.File + if info.MetaData[tus.CS3Prefix+"NodeId"] == "" { + // we need to check if the node exists via parentid & child name + p, err := node.ReadNode(ctx, lu, info.MetaData[tus.CS3Prefix+"SpaceRoot"], info.MetaData[tus.CS3Prefix+"NodeParentId"], false, nil, true) + if err != nil { + log.Error().Err(err).Msg("could not read parent node") + return nil, err + } + if !p.Exists { + return nil, errtypes.PreconditionFailed("parent does not exist") + } + n, err = p.Child(ctx, info.MetaData[tus.CS3Prefix+"filename"]) if err != nil { - log.Info().Err(err).Msg("error copying checksums") + log.Error().Err(err).Msg("could not read parent node") + return nil, err + } + if !n.Exists { + n.ID = uuid.New().String() + nodeHandle, err = initNewNode(ctx, lu, info, n) + if err != nil { + log.Error().Err(err).Msg("could not init new node") + return nil, err + } + log.Info().Str("lockfile", nodeHandle.Name()).Msg("got lock file from initNewNode") + } else { + nodeHandle, err = openExistingNode(ctx, lu, n) + if err != nil { + log.Error().Err(err).Msg("could not open existing node") + return nil, err + } + log.Info().Str("lockfile", nodeHandle.Name()).Msg("got lock file from openExistingNode") } } - // compare if they match the sent checksum - // TODO the tus checksum extension would do this on every chunk, but I currently don't see an easy way to pass in the requested checksum. for now we do it in FinishUpload which is also called for chunked uploads - if upload.Info.MetaData["checksum"] != "" { - var err error - parts := strings.SplitN(upload.Info.MetaData["checksum"], " ", 2) - if len(parts) != 2 { - return errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'") - } - switch parts[0] { - case "sha1": - err = upload.checkHash(parts[1], sha1h) - case "md5": - err = upload.checkHash(parts[1], md5h) - case "adler32": - err = upload.checkHash(parts[1], adler32h) - default: - err = errtypes.BadRequest("unsupported checksum algorithm: " + parts[0]) + if nodeHandle == nil { + n, err = node.ReadNode(ctx, lu, info.MetaData[tus.CS3Prefix+"SpaceRoot"], info.MetaData[tus.CS3Prefix+"NodeId"], false, nil, true) + if err != nil { + log.Error().Err(err).Msg("could not read parent node") + return nil, err } + nodeHandle, err = openExistingNode(ctx, lu, n) if err != nil { - Cleanup(upload, true, false) - return err + log.Error().Err(err).Msg("could not open existing node") + return nil, err } + log.Info().Str("lockfile", nodeHandle.Name()).Msg("got lock file from openExistingNode") } + defer func() { + if nodeHandle == nil { + return + } + if err := nodeHandle.Close(); err != nil { + log.Error().Err(err).Str("nodeid", n.ID).Str("parentid", n.ParentID).Msg("could not close lock") + } + }() - // update checksums - attrs := node.Attributes{ - prefixes.ChecksumPrefix + "sha1": sha1h.Sum(nil), - prefixes.ChecksumPrefix + "md5": md5h.Sum(nil), - prefixes.ChecksumPrefix + "adler32": adler32h.Sum(nil), + err = validateRequest(ctx, info, n) + if err != nil { + return nil, err } - n, err := CreateNodeForUpload(upload, attrs) + newBlobID := uuid.New().String() + + // set processing status of node + nodeAttrs := node.Attributes{} + // store new Blobid and Blobsize in node + // nodeAttrs.SetString(prefixes.BlobIDAttr, newBlobID) // BlobID is checked when removing a revision to decide if we also need to delete the node + // hm ... check if any other revisions are still available? + nodeAttrs.SetInt64(prefixes.BlobsizeAttr, info.Size) // FIXME ... argh now the propagation needs to revert the size diff propagation again + nodeAttrs.SetString(prefixes.StatusPrefix, node.ProcessingStatus+info.ID) + err = n.SetXattrsWithContext(ctx, nodeAttrs, false) if err != nil { - Cleanup(upload, true, false) - return err + return nil, errors.Wrap(err, "Decomposedfs: could not write metadata") } - upload.Node = n + revisionNode, err := n.ReadRevision(ctx, info.MetaData[tus.CS3Prefix+"RevisionTime"]) + if err != nil { + return nil, err + } - if upload.pub != nil { - u, _ := ctxpkg.ContextGetUser(upload.Ctx) - s, err := upload.URL(upload.Ctx) - if err != nil { - return err + revisionHandle, err = createRevisionNode(ctx, lu, revisionNode) + defer func() { + if revisionHandle == nil { + return } - - if err := events.Publish(ctx, upload.pub, events.BytesReceived{ - UploadID: upload.Info.ID, - URL: s, - SpaceOwner: n.SpaceOwnerOrManager(upload.Ctx), - ExecutingUser: u, - ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}, - Filename: upload.Info.Storage["NodeName"], - Filesize: uint64(upload.Info.Size), - }); err != nil { - return err + if err := revisionHandle.Close(); err != nil { + log.Error().Err(err).Str("nodeid", revisionNode.ID).Str("parentid", revisionNode.ParentID).Msg("could not close lock") } + }() + if err != nil { + return nil, err } - if !upload.async { - // handle postprocessing synchronously - err = upload.Finalize() - Cleanup(upload, err != nil, false) + // set upload related metadata + if info.MetaData[tus.TusPrefix+"mtime"] == "" { + attrs.SetString(prefixes.MTimeAttr, info.MetaData[tus.CS3Prefix+"RevisionTime"]) + } else { + // overwrite mtime if requested + mtime, err := utils.MTimeToTime(info.MetaData[tus.TusPrefix+"mtime"]) if err != nil { - log.Error().Err(err).Msg("failed to upload") - return err + return nil, err } + attrs.SetString(prefixes.MTimeAttr, mtime.UTC().Format(time.RFC3339Nano)) } + attrs.SetString(prefixes.BlobIDAttr, newBlobID) + attrs.SetInt64(prefixes.BlobsizeAttr, info.Size) + // TODO we should persist all versions as writes with ranges and the blobid in the node metadata + // attrs.SetString(prefixes.StatusPrefix, node.ProcessingStatus+info.ID) - return upload.tp.Propagate(upload.Ctx, n, upload.SizeDiff) -} + err = revisionNode.SetXattrsWithContext(ctx, attrs, false) + if err != nil { + return nil, errors.Wrap(err, "Decomposedfs: could not write metadata") + } -// Terminate terminates the upload -func (upload *Upload) Terminate(_ context.Context) error { - upload.cleanup(true, true, true) - return nil + return n, nil } -// DeclareLength updates the upload length information -func (upload *Upload) DeclareLength(_ context.Context, length int64) error { - upload.Info.Size = length - upload.Info.SizeIsDeferred = false - return upload.writeInfo() -} +func validateRequest(ctx context.Context, info tusd.FileInfo, n *node.Node) error { + if err := n.CheckLock(ctx); err != nil { + return err + } + + if _, err := node.CheckQuota(ctx, n.SpaceRoot, true, uint64(n.Blobsize), uint64(info.Size)); err != nil { + return err + } -// ConcatUploads concatenates multiple uploads -func (upload *Upload) ConcatUploads(_ context.Context, uploads []tusd.Upload) (err error) { - file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) + mtime, err := n.GetMTime(ctx) if err != nil { return err } - defer file.Close() + currentEtag, err := node.CalculateEtag(n, mtime) + if err != nil { + return err + } + + // When the if-match header was set we need to check if the + // etag still matches before finishing the upload. + if ifMatch, ok := info.MetaData[tus.CS3Prefix+"if-match"]; ok { + if ifMatch != currentEtag { + return errtypes.Aborted("etag mismatch") + } + } - for _, partialUpload := range uploads { - fileUpload := partialUpload.(*Upload) + // When the if-none-match header was set we need to check if any of the + // etags matches before finishing the upload. + if ifNoneMatch, ok := info.MetaData[tus.CS3Prefix+"if-none-match"]; ok { + if ifNoneMatch == "*" { + return errtypes.Aborted("etag mismatch, resource exists") + } + for _, ifNoneMatchTag := range strings.Split(ifNoneMatch, ",") { + if ifNoneMatchTag == currentEtag { + return errtypes.Aborted("etag mismatch") + } + } + } - src, err := os.Open(fileUpload.binPath) + // When the if-unmodified-since header was set we need to check if the + // etag still matches before finishing the upload. + if ifUnmodifiedSince, ok := info.MetaData[tus.CS3Prefix+"if-unmodified-since"]; ok { if err != nil { - return err + return errtypes.InternalError(fmt.Sprintf("failed to read mtime of node: %s", err)) + } + ifUnmodifiedSince, err := time.Parse(time.RFC3339Nano, ifUnmodifiedSince) + if err != nil { + return errtypes.InternalError(fmt.Sprintf("failed to parse if-unmodified-since time: %s", err)) } - defer src.Close() - if _, err := io.Copy(file, src); err != nil { - return err + if mtime.After(ifUnmodifiedSince) { + return errtypes.Aborted("if-unmodified-since mismatch") } } + return nil +} - return +func openExistingNode(ctx context.Context, lu *lookup.Lookup, n *node.Node) (*lockedfile.File, error) { + // create and read lock existing node metadata + return lockedfile.OpenFile(lu.MetadataBackend().LockfilePath(n.InternalPath()), os.O_RDONLY, 0600) } +func initNewNode(ctx context.Context, lu *lookup.Lookup, info tusd.FileInfo, n *node.Node) (*lockedfile.File, error) { + nodePath := n.InternalPath() + // create folder structure (if needed) + if err := os.MkdirAll(filepath.Dir(nodePath), 0700); err != nil { + return nil, err + } -// writeInfo updates the entire information. Everything will be overwritten. -func (upload *Upload) writeInfo() error { - _, span := tracer.Start(upload.Ctx, "writeInfo") - defer span.End() - data, err := json.Marshal(upload.Info) + // create and write lock new node metadata + f, err := lockedfile.OpenFile(lu.MetadataBackend().LockfilePath(nodePath), os.O_RDWR|os.O_CREATE, 0600) if err != nil { - return err + return nil, err } - return os.WriteFile(upload.infoPath, data, defaultFilePerm) -} -// Finalize finalizes the upload (eg moves the file to the internal destination) -func (upload *Upload) Finalize() (err error) { - ctx, span := tracer.Start(upload.Ctx, "Finalize") - defer span.End() - n := upload.Node - if n == nil { - var err error - n, err = node.ReadNode(ctx, upload.lu, upload.Info.Storage["SpaceRoot"], upload.Info.Storage["NodeId"], false, nil, false) - if err != nil { - return err + // FIXME if this is removed links to files will be dangling, causing subsequest stats to files to fail + // we also need to touch the actual node file here it stores the mtime of the resource + h, err := os.OpenFile(nodePath, os.O_CREATE|os.O_EXCL, 0600) + if err != nil { + return f, err + } + h.Close() + + // link child name to parent if it is new + childNameLink := filepath.Join(n.ParentPath(), n.Name) + relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) + log := appctx.GetLogger(ctx).With().Str("childNameLink", childNameLink).Str("relativeNodePath", relativeNodePath).Logger() + log.Info().Msg("initNewNode: creating symlink") + + if err = os.Symlink(relativeNodePath, childNameLink); err != nil { + log.Info().Err(err).Msg("initNewNode: symlink failed") + if errors.Is(err, iofs.ErrExist) { + log.Info().Err(err).Msg("initNewNode: symlink already exists") + return f, errtypes.AlreadyExists(n.Name) } - upload.Node = n + return f, errors.Wrap(err, "Decomposedfs: could not symlink child entry") } + log.Info().Msg("initNewNode: symlink created") - // upload the data to the blobstore - _, subspan := tracer.Start(ctx, "WriteBlob") - err = upload.tp.WriteBlob(n, upload.binPath) - subspan.End() + attrs := node.Attributes{} + attrs.SetInt64(prefixes.TypeAttr, int64(provider.ResourceType_RESOURCE_TYPE_FILE)) + attrs.SetString(prefixes.ParentidAttr, n.ParentID) + attrs.SetString(prefixes.NameAttr, n.Name) + attrs.SetString(prefixes.MTimeAttr, info.MetaData[tus.CS3Prefix+"RevisionTime"]) + + // here we set the status the first time. + attrs.SetString(prefixes.StatusPrefix, node.ProcessingStatus+info.ID) + + // update node metadata with basic metadata + err = n.SetXattrsWithContext(ctx, attrs, false) if err != nil { - return errors.Wrap(err, "failed to upload file to blostore") + return nil, errors.Wrap(err, "Decomposedfs: could not write metadata") } - - return nil + return f, nil } -func (upload *Upload) checkHash(expected string, h hash.Hash) error { - if expected != hex.EncodeToString(h.Sum(nil)) { - return errtypes.ChecksumMismatch(fmt.Sprintf("invalid checksum: expected %s got %x", upload.Info.MetaData["checksum"], h.Sum(nil))) +func createRevisionNode(ctx context.Context, lu *lookup.Lookup, revisionNode *node.Node) (*lockedfile.File, error) { + revisionPath := revisionNode.InternalPath() + // write lock existing node before reading any metadata + f, err := lockedfile.OpenFile(lu.MetadataBackend().LockfilePath(revisionPath), os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return nil, err } - return nil + + // FIXME if this is removed listing revisions breaks because it globs the dir but then filters all metadata files + // we also need to touch the a vorsions node here to list revisions + h, err := os.OpenFile(revisionPath, os.O_CREATE|os.O_EXCL, 0600) + if err != nil { + return f, err + } + h.Close() + return f, nil } -// cleanup cleans up after the upload is finished -func (upload *Upload) cleanup(cleanNode, cleanBin, cleanInfo bool) { - if cleanNode && upload.Node != nil { - switch p := upload.versionsPath; p { - case "": - // remove node - if err := utils.RemoveItem(upload.Node.InternalPath()); err != nil { - upload.log.Info().Str("path", upload.Node.InternalPath()).Err(err).Msg("removing node failed") - } +func SetNodeToRevision(ctx context.Context, lu *lookup.Lookup, n *node.Node, revision string) (int64, error) { - // no old version was present - remove child entry - src := filepath.Join(upload.Node.ParentPath(), upload.Node.Name) - if err := os.Remove(src); err != nil { - upload.log.Info().Str("path", upload.Node.ParentPath()).Err(err).Msg("removing node from parent failed") - } + nodePath := n.InternalPath() + // lock existing node metadata + f, err := lockedfile.OpenFile(lu.MetadataBackend().LockfilePath(nodePath), os.O_RDWR, 0600) + if err != nil { + return 0, err + } + defer f.Close() + // read nodes - // remove node from upload as it no longer exists - upload.Node = nil - default: + n, err = node.ReadNode(ctx, lu, n.SpaceID, n.ID, false, n.SpaceRoot, true) + if err != nil { + return 0, err + } - if err := upload.lu.CopyMetadata(upload.Ctx, p, upload.Node.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) { - return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) || - attributeName == prefixes.TypeAttr || - attributeName == prefixes.BlobIDAttr || - attributeName == prefixes.BlobsizeAttr || - attributeName == prefixes.MTimeAttr - }, true); err != nil { - upload.log.Info().Str("versionpath", p).Str("nodepath", upload.Node.InternalPath()).Err(err).Msg("renaming version node failed") - } + revisionNode, err := n.ReadRevision(ctx, revision) + if err != nil { + return 0, err + } - if err := os.RemoveAll(p); err != nil { - upload.log.Info().Str("versionpath", p).Str("nodepath", upload.Node.InternalPath()).Err(err).Msg("error removing version") - } + sizeDiff := revisionNode.Blobsize - n.Blobsize + + n.BlobID = revisionNode.BlobID + n.Blobsize = revisionNode.Blobsize + revisionAttrs, err := revisionNode.Xattrs(ctx) + if err != nil { + return 0, err + } + attrs := node.Attributes{} + attrs.SetString(prefixes.BlobIDAttr, revisionNode.BlobID) + attrs.SetInt64(prefixes.BlobsizeAttr, revisionNode.Blobsize) + attrs[prefixes.MTimeAttr] = revisionAttrs[prefixes.MTimeAttr] + + // copy checksums TODO we need to make sure ALL old checksums are wiped + for k, v := range revisionAttrs { + if strings.HasPrefix(k, prefixes.ChecksumPrefix) { + attrs[k] = v } } - if cleanBin { - if err := os.Remove(upload.binPath); err != nil && !errors.Is(err, fs.ErrNotExist) { - upload.log.Error().Str("path", upload.binPath).Err(err).Msg("removing upload failed") - } + err = n.SetXattrsWithContext(ctx, attrs, false) + if err != nil { + return 0, errors.Wrap(err, "Decomposedfs: could not write metadata") } - if cleanInfo { - if err := os.Remove(upload.infoPath); err != nil && !errors.Is(err, fs.ErrNotExist) { - upload.log.Error().Str("path", upload.infoPath).Err(err).Msg("removing upload info failed") + return sizeDiff, nil +} + +func ReadNode(ctx context.Context, lu *lookup.Lookup, info tusd.FileInfo) (*node.Node, error) { + var n *node.Node + var err error + if info.MetaData[tus.CS3Prefix+"NodeId"] == "" { + p, err := node.ReadNode(ctx, lu, info.MetaData[tus.CS3Prefix+"SpaceRoot"], info.MetaData[tus.CS3Prefix+"NodeParentId"], false, nil, true) + if err != nil { + return nil, err + } + n, err = p.Child(ctx, info.MetaData[tus.CS3Prefix+"filename"]) + if err != nil { + return nil, err + } + } else { + n, err = node.ReadNode(ctx, lu, info.MetaData[tus.CS3Prefix+"SpaceRoot"], info.MetaData[tus.CS3Prefix+"NodeId"], false, nil, true) + if err != nil { + return nil, err } } + return n, nil } -// URL returns a url to download an upload -func (upload *Upload) URL(_ context.Context) (string, error) { - type transferClaims struct { - jwt.StandardClaims - Target string `json:"target"` +// Cleanup cleans the upload +func Cleanup(ctx context.Context, lu *lookup.Lookup, n *node.Node, info tusd.FileInfo, failure bool) { + ctx, span := tracer.Start(ctx, "Cleanup") + defer span.End() + + if n != nil { // node can be nil when there was an error before it was created (eg. checksum-mismatch) + if failure { + removeRevision(ctx, lu, n, info.MetaData[tus.CS3Prefix+"RevisionTime"]) + } + // unset processing status + if err := n.UnmarkProcessing(ctx, info.ID); err != nil { + log := appctx.GetLogger(ctx) + log.Info().Str("path", n.InternalPath()).Err(err).Msg("unmarking processing failed") + } } +} - u := joinurl(upload.tknopts.DownloadEndpoint, "tus/", upload.Info.ID) - ttl := time.Duration(upload.tknopts.TransferExpires) * time.Second - claims := transferClaims{ - StandardClaims: jwt.StandardClaims{ - ExpiresAt: time.Now().Add(ttl).Unix(), - Audience: "reva", - IssuedAt: time.Now().Unix(), - }, - Target: u, +// removeRevision cleans up after the upload is finished +func removeRevision(ctx context.Context, lu *lookup.Lookup, n *node.Node, revision string) { + log := appctx.GetLogger(ctx) + nodePath := n.InternalPath() + revisionPath := nodePath + node.RevisionIDDelimiter + revision + // remove revision + if err := utils.RemoveItem(revisionPath); err != nil { + log.Info().Str("path", revisionPath).Err(err).Msg("removing revision failed") } + // purge revision metadata to clean up cache + if err := lu.MetadataBackend().Purge(revisionPath); err != nil { + log.Info().Str("path", revisionPath).Err(err).Msg("purging revision metadata failed") + } + + if n.BlobID == "" { // FIXME ... this is brittle + // no old version was present - remove child entry symlink from directory + src := filepath.Join(n.ParentPath(), n.Name) + if err := os.Remove(src); err != nil { + log.Info().Str("path", n.ParentPath()).Err(err).Msg("removing node from parent failed") + } - t := jwt.NewWithClaims(jwt.GetSigningMethod("HS256"), claims) + // delete node + if err := utils.RemoveItem(nodePath); err != nil { + log.Info().Str("path", nodePath).Err(err).Msg("removing node failed") + } - tkn, err := t.SignedString([]byte(upload.tknopts.TransferSharedSecret)) - if err != nil { - return "", errors.Wrapf(err, "error signing token with claims %+v", claims) + // purge node metadata to clean up cache + if err := lu.MetadataBackend().Purge(nodePath); err != nil { + log.Info().Str("path", nodePath).Err(err).Msg("purging node metadata failed") + } } - - return joinurl(upload.tknopts.DataGatewayEndpoint, tkn), nil } -// replace with url.JoinPath after switching to go1.19 -func joinurl(paths ...string) string { - var s strings.Builder - l := len(paths) - for i, p := range paths { - s.WriteString(p) - if !strings.HasSuffix(p, "/") && i != l-1 { - s.WriteString("/") +// Finalize finalizes the upload (eg moves the file to the internal destination) +func Finalize(ctx context.Context, blobstore tree.Blobstore, info tusd.FileInfo, n *node.Node) error { + _, span := tracer.Start(ctx, "Finalize") + defer span.End() + + rn, err := n.ReadRevision(ctx, info.MetaData[tus.CS3Prefix+"RevisionTime"]) + if err != nil { + return errors.Wrap(err, "failed to read revision") + } + if mover, ok := blobstore.(tree.BlobstoreMover); ok { + err = mover.MoveBlob(rn, "", info.Storage["Bucket"], info.Storage["Key"]) + switch err { + case nil: + return nil + case tree.ErrBlobstoreCannotMove: + // fallback below + default: + return err } } - return s.String() + // upload the data to the blobstore + _, subspan := tracer.Start(ctx, "WriteBlob") + err = blobstore.Upload(rn, info.Storage["Path"]) // FIXME where do we read from + subspan.End() + if err != nil { + return errors.Wrap(err, "failed to upload file to blostore") + } + + // FIXME use a reader + return nil } diff --git a/pkg/storage/utils/decomposedfs/upload_async_test.go b/pkg/storage/utils/decomposedfs/upload_async_test.go index 87a4fa7317..e9bb771e7b 100644 --- a/pkg/storage/utils/decomposedfs/upload_async_test.go +++ b/pkg/storage/utils/decomposedfs/upload_async_test.go @@ -27,6 +27,8 @@ import ( "github.com/cs3org/reva/v2/pkg/utils" "github.com/cs3org/reva/v2/tests/helpers" "github.com/stretchr/testify/mock" + "github.com/tus/tusd/pkg/filestore" + tusd "github.com/tus/tusd/pkg/handler" "google.golang.org/grpc" . "github.com/onsi/ginkgo/v2" @@ -67,6 +69,7 @@ var _ = Describe("Async file uploads", Ordered, func() { con chan interface{} uploadID string + dataStore tusd.DataStore fs storage.FS o *options.Options lu *lookup.Lookup @@ -81,6 +84,8 @@ var _ = Describe("Async file uploads", Ordered, func() { tmpRoot, err := helpers.TempDir("reva-unit-tests-*-root") Expect(err).ToNot(HaveOccurred()) + dataStore = filestore.New(filepath.Join(tmpRoot, "uploads")) + o, err = options.New(map[string]interface{}{ "root": tmpRoot, "asyncfileuploads": true, @@ -119,7 +124,7 @@ var _ = Describe("Async file uploads", Ordered, func() { // setup fs pub, con = make(chan interface{}), make(chan interface{}) tree := tree.New(lu, bs, o, store.Create()) - fs, err = New(o, lu, NewPermissions(permissions, permissionsSelector), tree, stream.Chan{pub, con}) + fs, err = New(o, lu, NewPermissions(permissions, permissionsSelector), tree, stream.Chan{pub, con}, dataStore, bs) Expect(err).ToNot(HaveOccurred()) resp, err := fs.CreateStorageSpace(ctx, &provider.CreateStorageSpaceRequest{Owner: user, Type: "personal"}) @@ -139,7 +144,7 @@ var _ = Describe("Async file uploads", Ordered, func() { }) // start upload of a file - uploadIds, err := fs.InitiateUpload(ctx, ref, 10, map[string]string{}) + uploadIds, err := fs.InitiateUpload(ctx, ref, 10, nil) Expect(err).ToNot(HaveOccurred()) Expect(len(uploadIds)).To(Equal(2)) Expect(uploadIds["simple"]).ToNot(BeEmpty()) diff --git a/pkg/storage/utils/decomposedfs/upload_test.go b/pkg/storage/utils/decomposedfs/upload_test.go index 44d4dd78c5..52dedf0879 100644 --- a/pkg/storage/utils/decomposedfs/upload_test.go +++ b/pkg/storage/utils/decomposedfs/upload_test.go @@ -23,6 +23,7 @@ import ( "context" "io" "os" + "path/filepath" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" cs3permissions "github.com/cs3org/go-cs3apis/cs3/permissions/v1beta1" @@ -45,6 +46,8 @@ import ( "github.com/cs3org/reva/v2/pkg/store" "github.com/cs3org/reva/v2/tests/helpers" "github.com/stretchr/testify/mock" + "github.com/tus/tusd/pkg/filestore" + tusd "github.com/tus/tusd/pkg/handler" "google.golang.org/grpc" . "github.com/onsi/ginkgo/v2" @@ -59,6 +62,7 @@ var _ = Describe("File uploads", func() { user *userpb.User ctx context.Context + dataStore tusd.DataStore o *options.Options lu *lookup.Lookup permissions *mocks.PermissionsChecker @@ -97,6 +101,8 @@ var _ = Describe("File uploads", func() { tmpRoot, err := helpers.TempDir("reva-unit-tests-*-root") Expect(err).ToNot(HaveOccurred()) + dataStore = filestore.New(filepath.Join(tmpRoot, "uploads")) + o, err = options.New(map[string]interface{}{ "root": tmpRoot, }) @@ -133,7 +139,7 @@ var _ = Describe("File uploads", func() { }, nil).Times(1) var err error tree := tree.New(lu, bs, o, store.Create()) - fs, err = decomposedfs.New(o, lu, decomposedfs.NewPermissions(permissions, permissionsSelector), tree, nil) + fs, err = decomposedfs.New(o, lu, decomposedfs.NewPermissions(permissions, permissionsSelector), tree, nil, dataStore, bs) Expect(err).ToNot(HaveOccurred()) resp, err := fs.CreateStorageSpace(ctx, &provider.CreateStorageSpaceRequest{Owner: user, Type: "personal"}) diff --git a/pkg/storage/utils/metadata/cs3.go b/pkg/storage/utils/metadata/cs3.go index 2f0fed087e..429d0f5be6 100644 --- a/pkg/storage/utils/metadata/cs3.go +++ b/pkg/storage/utils/metadata/cs3.go @@ -181,6 +181,9 @@ func (cs3 *CS3) Upload(ctx context.Context, req UploadRequest) (*UploadResponse, ifuReq.Opaque = utils.AppendPlainToOpaque(ifuReq.Opaque, "X-OC-Mtime", strconv.Itoa(int(req.MTime.Unix()))+"."+strconv.Itoa(req.MTime.Nanosecond())) } + // FIXME ... we need a better way to transport filesize + // ifuReq.Opaque = utils.AppendPlainToOpaque(ifuReq.Opaque, net.HeaderUploadLength, strconv.FormatInt(int64(len(req.Content)), 10)) + res, err := client.InitiateFileUpload(ctx, ifuReq) if err != nil { return nil, err diff --git a/tests/helpers/helpers.go b/tests/helpers/helpers.go index e80f16fff4..2c4a0bcfac 100644 --- a/tests/helpers/helpers.go +++ b/tests/helpers/helpers.go @@ -94,6 +94,11 @@ func TempJSONFile(c any) (string, error) { // Upload can be used to initiate an upload and do the upload to a storage.FS in one step func Upload(ctx context.Context, fs storage.FS, ref *provider.Reference, content []byte) error { + /* + FIXME always send size ... needs upated nextcloud tests + size := len(content) + uploadIds, err := fs.InitiateUpload(ctx, ref, int64(size), map[string]string{}) + */ uploadIds, err := fs.InitiateUpload(ctx, ref, 0, map[string]string{}) if err != nil { return err diff --git a/tests/oc-integration-tests/drone/storage-users-s3ng.toml b/tests/oc-integration-tests/drone/storage-users-s3ng.toml index db2f0ba336..252b24c2c5 100644 --- a/tests/oc-integration-tests/drone/storage-users-s3ng.toml +++ b/tests/oc-integration-tests/drone/storage-users-s3ng.toml @@ -28,6 +28,8 @@ permissionssvc = "localhost:10000" "s3.bucket" = "test" "s3.access_key" = "test" "s3.secret_key" = "test" +"s3.disable_ssl" = true +"s3.force_path_style" = true "metadata_backend" = "xattrs" # we have a locally running dataprovider @@ -48,4 +50,6 @@ permissionssvc = "localhost:10000" "s3.bucket" = "test" "s3.access_key" = "test" "s3.secret_key" = "test" +"s3.disable_ssl" = true +"s3.force_path_style" = true "metadata_backend" = "xattrs"