diff --git a/changelog/unreleased/upload-expiration.md b/changelog/unreleased/upload-expiration.md new file mode 100644 index 0000000000..c493f895ca --- /dev/null +++ b/changelog/unreleased/upload-expiration.md @@ -0,0 +1,9 @@ +Enhancement: Upload expiration and cleanup + +We made storage providers aware of upload expiration and added an interface +for FS which support listing and purging expired uploads. + +We also implemented said interface for decomposedfs. + +https://github.com/cs3org/reva/pull/3095 +https://github.com/owncloud/ocis/pull/4256 diff --git a/go.mod b/go.mod index 69a0ee40dc..a0dfde8290 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/cheggaaa/pb v1.0.29 github.com/coreos/go-oidc v2.2.1+incompatible github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e - github.com/cs3org/go-cs3apis v0.0.0-20220711084433-8f71d4e812a3 + github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64 github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 github.com/dgraph-io/ristretto v0.1.0 github.com/eventials/go-tus v0.0.0-20220610120217-05d0564bb571 diff --git a/go.sum b/go.sum index 2982135e3f..679c376970 100644 --- a/go.sum +++ b/go.sum @@ -170,8 +170,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJffz4pz0o1WuQxJ28+5x5JgaHD8= github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4= -github.com/cs3org/go-cs3apis v0.0.0-20220711084433-8f71d4e812a3 h1:QSQ2DGKPMChB4vHSs1Os9TnOJl21BrzKX9D5EtQfDog= -github.com/cs3org/go-cs3apis v0.0.0-20220711084433-8f71d4e812a3/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= +github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64 h1:cFnankJOCWndnOns4sKRG7yzH61ammK2Am6rEGWCK40= +github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/internal/grpc/services/gateway/storageprovider.go b/internal/grpc/services/gateway/storageprovider.go index cab0abbf7d..3fe3e9965e 100644 --- a/internal/grpc/services/gateway/storageprovider.go +++ b/internal/grpc/services/gateway/storageprovider.go @@ -81,13 +81,12 @@ type transferClaims struct { Target string `json:"target"` } -func (s *svc) sign(_ context.Context, target string) (string, error) { +func (s *svc) sign(_ context.Context, target string, expiresAt int64) (string, error) { // Tus sends a separate request to the datagateway service for every chunk. // For large files, this can take a long time, so we extend the expiration - ttl := time.Duration(s.c.TransferExpires) * time.Second claims := transferClaims{ StandardClaims: jwt.StandardClaims{ - ExpiresAt: time.Now().Add(ttl).Unix(), + ExpiresAt: expiresAt, Audience: "reva", IssuedAt: time.Now().Unix(), }, @@ -510,7 +509,7 @@ func (s *svc) InitiateFileDownload(ctx context.Context, req *provider.InitiateFi // TODO(labkode): calculate signature of the whole request? we only sign the URI now. Maybe worth https://tools.ietf.org/html/draft-cavage-http-signatures-11 target := u.String() - token, err := s.sign(ctx, target) + token, err := s.sign(ctx, target, time.Now().UTC().Add(time.Duration(s.c.TransferExpires)*time.Second).Unix()) if err != nil { return &gateway.InitiateFileDownloadResponse{ Status: status.NewStatusFromErrType(ctx, "error creating signature for download", err), @@ -572,7 +571,12 @@ func (s *svc) InitiateFileUpload(ctx context.Context, req *provider.InitiateFile // TODO(labkode): calculate signature of the whole request? we only sign the URI now. Maybe worth https://tools.ietf.org/html/draft-cavage-http-signatures-11 target := u.String() - token, err := s.sign(ctx, target) + ttl := time.Duration(s.c.TransferExpires) * time.Second + expiresAt := time.Now().Add(ttl).Unix() + if storageRes.Protocols[p].Expiration != nil { + expiresAt = utils.TSToTime(storageRes.Protocols[p].Expiration).Unix() + } + token, err := s.sign(ctx, target, expiresAt) if err != nil { return &gateway.InitiateFileUploadResponse{ Status: status.NewStatusFromErrType(ctx, "error creating signature for upload", err), diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index 5fa9191e51..db2c091cb3 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -28,6 +28,7 @@ import ( "path" "sort" "strconv" + "time" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" @@ -65,6 +66,7 @@ type config struct { AvailableXS map[string]uint32 `mapstructure:"available_checksums" docs:"nil;List of available checksums."` CustomMimeTypesJSON string `mapstructure:"custom_mimetypes_json" docs:"nil;An optional mapping file with the list of supported custom file extensions and corresponding mime types."` MountID string `mapstructure:"mount_id"` + UploadExpiration int64 `mapstructure:"upload_expiration" docs:"0;Duration for how long uploads will be valid."` } func (c *config) init() { @@ -347,6 +349,13 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate // pass on the provider it to be persisted with the upload info. that is required to correlate the upload with the proper provider later on metadata["providerID"] = s.conf.MountID + var expirationTimestamp *typesv1beta1.Timestamp + if s.conf.UploadExpiration > 0 { + expirationTimestamp = &typesv1beta1.Timestamp{ + Seconds: uint64(time.Now().UTC().Add(time.Duration(s.conf.UploadExpiration) * time.Second).Unix()), + } + metadata["expires"] = strconv.Itoa(int(expirationTimestamp.Seconds)) + } uploadIDs, err := s.storage.InitiateUpload(ctx, req.Ref, uploadLength, metadata) if err != nil { @@ -393,6 +402,7 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate UploadEndpoint: u.String(), AvailableChecksums: s.availableXS, Expose: s.conf.ExposeDataServer, + Expiration: expirationTimestamp, } i++ log.Info().Str("data-server", u.String()). diff --git a/pkg/rhttp/datatx/manager/tus/tus.go b/pkg/rhttp/datatx/manager/tus/tus.go index 684888a6fd..1febf18474 100644 --- a/pkg/rhttp/datatx/manager/tus/tus.go +++ b/pkg/rhttp/datatx/manager/tus/tus.go @@ -21,6 +21,7 @@ package tus import ( "context" "net/http" + "path" "path/filepath" "github.com/pkg/errors" @@ -28,6 +29,7 @@ import ( userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net" "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/events" @@ -135,6 +137,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { case "HEAD": handler.HeadFile(w, r) case "PATCH": + setExpiresHeader(fs, w, r) handler.PatchFile(w, r) case "DELETE": handler.DelFile(w, r) @@ -153,3 +156,27 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { type composable interface { UseIn(composer *tusd.StoreComposer) } + +func setExpiresHeader(fs storage.FS, w http.ResponseWriter, r *http.Request) { + ctx := context.Background() + id := path.Base(r.URL.Path) + datastore, ok := fs.(tusd.DataStore) + if !ok { + appctx.GetLogger(ctx).Error().Interface("fs", fs).Msg("storage is not a tus datastore") + return + } + upload, err := datastore.GetUpload(context.Background(), id) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Msg("could not get upload from storage") + return + } + info, err := upload.GetInfo(ctx) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Msg("could not get upload info for upload") + return + } + expires := info.MetaData["expires"] + if expires != "" { + w.Header().Set(net.HeaderTusUploadExpires, expires) + } +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d02b912b5f..c355631a2e 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -23,6 +23,8 @@ import ( "io" "net/url" + tusd "github.com/tus/tusd/pkg/handler" + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1" @@ -75,6 +77,12 @@ type FS interface { DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) error } +// UploadsManager defines the interface for FS implementations that allow for managing uploads +type UploadsManager interface { + ListUploads() ([]tusd.FileInfo, error) + PurgeExpiredUploads(chan<- tusd.FileInfo) error +} + // Registry is the interface that storage registries implement // for discovering storage providers type Registry interface { diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 3e6c01e20f..0d2f15179b 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -32,9 +32,13 @@ import ( "io/ioutil" "os" "path/filepath" + "regexp" + "strconv" "strings" "time" + tusd "github.com/tus/tusd/pkg/handler" + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" @@ -50,7 +54,6 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/rs/zerolog" - tusd "github.com/tus/tusd/pkg/handler" ) var defaultFilePerm = os.FileMode(0664) @@ -154,6 +157,9 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere if mtime, ok := metadata["mtime"]; ok { info.MetaData["mtime"] = mtime } + if expiration, ok := metadata["expires"]; ok { + info.MetaData["expires"] = expiration + } if _, ok := metadata["sizedeferred"]; ok { info.SizeIsDeferred = true } @@ -335,8 +341,7 @@ func (fs *Decomposedfs) getUploadPath(ctx context.Context, uploadID string) (str return filepath.Join(fs.o.Root, "uploads", uploadID), nil } -// GetUpload returns the Upload for the given upload id -func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) { +func (fs *Decomposedfs) readInfo(id string) (tusd.FileInfo, error) { infoPath := filepath.Join(fs.o.Root, "uploads", id+".info") info := tusd.FileInfo{} @@ -346,19 +351,28 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, // Interpret os.ErrNotExist as 404 Not Found err = tusd.ErrNotFound } - return nil, err + return tusd.FileInfo{}, err } if err := json.Unmarshal(data, &info); err != nil { - return nil, err + return tusd.FileInfo{}, err } stat, err := os.Stat(info.Storage["BinPath"]) if err != nil { - return nil, err + return tusd.FileInfo{}, err } - info.Offset = stat.Size() + return info, nil +} + +// GetUpload returns the Upload for the given upload id +func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) { + info, err := fs.readInfo(id) + if err != nil { + return nil, err + } + u := &userpb.User{ Id: &userpb.UserId{ Idp: info.Storage["Idp"], @@ -383,12 +397,66 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, return &fileUpload{ info: info, binPath: info.Storage["BinPath"], - infoPath: infoPath, + infoPath: filepath.Join(fs.o.Root, "uploads", id+".info"), fs: fs, ctx: ctx, }, nil } +// ListUploads returns a list of all incomplete uploads +func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) { + return fs.uploadInfos() +} + +func (fs *Decomposedfs) uploadInfos() ([]tusd.FileInfo, error) { + infos := []tusd.FileInfo{} + infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info")) + if err != nil { + return nil, err + } + + idRegexp := regexp.MustCompile(".*/([^/]+).info") + for _, info := range infoFiles { + match := idRegexp.FindStringSubmatch(info) + if match == nil || len(match) < 2 { + continue + } + info, err := fs.readInfo(match[1]) + if err != nil { + return nil, err + } + infos = append(infos, info) + } + return infos, nil +} + +// PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers +func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) error { + infos, err := fs.uploadInfos() + if err != nil { + return err + } + + for _, info := range infos { + expires, err := strconv.Atoi(info.MetaData["expires"]) + if err != nil { + continue + } + if int64(expires) < time.Now().Unix() { + purgedChan <- info + err = os.Remove(info.Storage["BinPath"]) + if err != nil { + return err + } + err = os.Remove(filepath.Join(fs.o.Root, "uploads", info.ID+".info")) + if err != nil { + return err + } + } + } + return nil +} + // lookupNode looks up nodes by path. // This method can also handle lookups for paths which contain chunking information. func (fs *Decomposedfs) lookupNode(ctx context.Context, spaceRoot *node.Node, path string) (*node.Node, error) {