Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup uploads #3095

Merged
merged 9 commits into from
Aug 3, 2022
9 changes: 9 additions & 0 deletions changelog/unreleased/upload-expiration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Enhancement: Upload expiration and cleanup

We made storage providers aware of upload expiration and added an interface
for FS which support listing and purging expired uploads.

We also implemented said interface for decomposedfs.

https://github.com/cs3org/reva/pull/3095
https://github.com/owncloud/ocis/pull/4256
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/cheggaaa/pb v1.0.29
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e
github.com/cs3org/go-cs3apis v0.0.0-20220711084433-8f71d4e812a3
github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8
github.com/dgraph-io/ristretto v0.1.0
github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJffz4pz0o1WuQxJ28+5x5JgaHD8=
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4=
github.com/cs3org/go-cs3apis v0.0.0-20220711084433-8f71d4e812a3 h1:QSQ2DGKPMChB4vHSs1Os9TnOJl21BrzKX9D5EtQfDog=
github.com/cs3org/go-cs3apis v0.0.0-20220711084433-8f71d4e812a3/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64 h1:cFnankJOCWndnOns4sKRG7yzH61ammK2Am6rEGWCK40=
github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
Expand Down
14 changes: 9 additions & 5 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,12 @@ type transferClaims struct {
Target string `json:"target"`
}

func (s *svc) sign(_ context.Context, target string) (string, error) {
func (s *svc) sign(_ context.Context, target string, expiresAt int64) (string, error) {
// Tus sends a separate request to the datagateway service for every chunk.
// For large files, this can take a long time, so we extend the expiration
ttl := time.Duration(s.c.TransferExpires) * time.Second
claims := transferClaims{
StandardClaims: jwt.StandardClaims{
ExpiresAt: time.Now().Add(ttl).Unix(),
ExpiresAt: expiresAt,
Audience: "reva",
IssuedAt: time.Now().Unix(),
},
Expand Down Expand Up @@ -510,7 +509,7 @@ func (s *svc) InitiateFileDownload(ctx context.Context, req *provider.InitiateFi

// TODO(labkode): calculate signature of the whole request? we only sign the URI now. Maybe worth https://tools.ietf.org/html/draft-cavage-http-signatures-11
target := u.String()
token, err := s.sign(ctx, target)
token, err := s.sign(ctx, target, time.Now().UTC().Add(time.Duration(s.c.TransferExpires)*time.Second).Unix())
if err != nil {
return &gateway.InitiateFileDownloadResponse{
Status: status.NewStatusFromErrType(ctx, "error creating signature for download", err),
Expand Down Expand Up @@ -572,7 +571,12 @@ func (s *svc) InitiateFileUpload(ctx context.Context, req *provider.InitiateFile

// TODO(labkode): calculate signature of the whole request? we only sign the URI now. Maybe worth https://tools.ietf.org/html/draft-cavage-http-signatures-11
target := u.String()
token, err := s.sign(ctx, target)
ttl := time.Duration(s.c.TransferExpires) * time.Second
expiresAt := time.Now().Add(ttl).Unix()
if storageRes.Protocols[p].Expiration != nil {
expiresAt = utils.TSToTime(storageRes.Protocols[p].Expiration).Unix()
kobergj marked this conversation as resolved.
Show resolved Hide resolved
}
token, err := s.sign(ctx, target, expiresAt)
if err != nil {
return &gateway.InitiateFileUploadResponse{
Status: status.NewStatusFromErrType(ctx, "error creating signature for upload", err),
Expand Down
10 changes: 10 additions & 0 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"path"
"sort"
"strconv"
"time"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand Down Expand Up @@ -65,6 +66,7 @@ type config struct {
AvailableXS map[string]uint32 `mapstructure:"available_checksums" docs:"nil;List of available checksums."`
CustomMimeTypesJSON string `mapstructure:"custom_mimetypes_json" docs:"nil;An optional mapping file with the list of supported custom file extensions and corresponding mime types."`
MountID string `mapstructure:"mount_id"`
UploadExpiration int64 `mapstructure:"upload_expiration" docs:"0;Duration for how long uploads will be valid."`
kobergj marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *config) init() {
Expand Down Expand Up @@ -347,6 +349,13 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate

// pass on the provider it to be persisted with the upload info. that is required to correlate the upload with the proper provider later on
metadata["providerID"] = s.conf.MountID
var expirationTimestamp *typesv1beta1.Timestamp
if s.conf.UploadExpiration > 0 {
expirationTimestamp = &typesv1beta1.Timestamp{
Seconds: uint64(time.Now().UTC().Add(time.Duration(s.conf.UploadExpiration) * time.Second).Unix()),
}
metadata["expires"] = strconv.Itoa(int(expirationTimestamp.Seconds))
}

uploadIDs, err := s.storage.InitiateUpload(ctx, req.Ref, uploadLength, metadata)
if err != nil {
Expand Down Expand Up @@ -393,6 +402,7 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
UploadEndpoint: u.String(),
AvailableChecksums: s.availableXS,
Expose: s.conf.ExposeDataServer,
Expiration: expirationTimestamp,
}
i++
log.Info().Str("data-server", u.String()).
Expand Down
27 changes: 27 additions & 0 deletions pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ package tus
import (
"context"
"net/http"
"path"
"path/filepath"

"github.com/pkg/errors"
tusd "github.com/tus/tusd/pkg/handler"

userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
Expand Down Expand Up @@ -135,6 +137,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
case "HEAD":
handler.HeadFile(w, r)
case "PATCH":
setExpiresHeader(fs, w, r)
handler.PatchFile(w, r)
case "DELETE":
handler.DelFile(w, r)
Expand All @@ -153,3 +156,27 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
type composable interface {
UseIn(composer *tusd.StoreComposer)
}

func setExpiresHeader(fs storage.FS, w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
id := path.Base(r.URL.Path)
datastore, ok := fs.(tusd.DataStore)
if !ok {
appctx.GetLogger(ctx).Error().Interface("fs", fs).Msg("storage is not a tus datastore")
return
}
upload, err := datastore.GetUpload(context.Background(), id)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("could not get upload from storage")
return
}
info, err := upload.GetInfo(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("could not get upload info for upload")
return
}
expires := info.MetaData["expires"]
if expires != "" {
w.Header().Set(net.HeaderTusUploadExpires, expires)
}
}
8 changes: 8 additions & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"io"
"net/url"

tusd "github.com/tus/tusd/pkg/handler"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1"
Expand Down Expand Up @@ -75,6 +77,12 @@ type FS interface {
DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) error
}

// UploadsManager defines the interface for FS implementations that allow for managing uploads
type UploadsManager interface {
ListUploads() ([]tusd.FileInfo, error)
PurgeExpiredUploads(chan<- tusd.FileInfo) error
}

// Registry is the interface that storage registries implement
// for discovering storage providers
type Registry interface {
Expand Down
84 changes: 76 additions & 8 deletions pkg/storage/utils/decomposedfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ import (
"io/ioutil"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"

tusd "github.com/tus/tusd/pkg/handler"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
Expand All @@ -50,7 +54,6 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog"
tusd "github.com/tus/tusd/pkg/handler"
)

var defaultFilePerm = os.FileMode(0664)
Expand Down Expand Up @@ -154,6 +157,9 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere
if mtime, ok := metadata["mtime"]; ok {
info.MetaData["mtime"] = mtime
}
if expiration, ok := metadata["expires"]; ok {
info.MetaData["expires"] = expiration
}
if _, ok := metadata["sizedeferred"]; ok {
info.SizeIsDeferred = true
}
Expand Down Expand Up @@ -335,8 +341,7 @@ func (fs *Decomposedfs) getUploadPath(ctx context.Context, uploadID string) (str
return filepath.Join(fs.o.Root, "uploads", uploadID), nil
}

// GetUpload returns the Upload for the given upload id
func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) {
func (fs *Decomposedfs) readInfo(id string) (tusd.FileInfo, error) {
infoPath := filepath.Join(fs.o.Root, "uploads", id+".info")

info := tusd.FileInfo{}
Expand All @@ -346,19 +351,28 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload,
// Interpret os.ErrNotExist as 404 Not Found
err = tusd.ErrNotFound
}
return nil, err
return tusd.FileInfo{}, err
}
if err := json.Unmarshal(data, &info); err != nil {
return nil, err
return tusd.FileInfo{}, err
}

stat, err := os.Stat(info.Storage["BinPath"])
if err != nil {
return nil, err
return tusd.FileInfo{}, err
}

info.Offset = stat.Size()

return info, nil
}

// GetUpload returns the Upload for the given upload id
func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) {
info, err := fs.readInfo(id)
if err != nil {
return nil, err
}

u := &userpb.User{
Id: &userpb.UserId{
Idp: info.Storage["Idp"],
Expand All @@ -383,12 +397,66 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload,
return &fileUpload{
info: info,
binPath: info.Storage["BinPath"],
infoPath: infoPath,
infoPath: filepath.Join(fs.o.Root, "uploads", id+".info"),
fs: fs,
ctx: ctx,
}, nil
}

// ListUploads returns a list of all incomplete uploads
func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) {
return fs.uploadInfos()
}

func (fs *Decomposedfs) uploadInfos() ([]tusd.FileInfo, error) {
infos := []tusd.FileInfo{}
infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info"))
if err != nil {
return nil, err
}

idRegexp := regexp.MustCompile(".*/([^/]+).info")
for _, info := range infoFiles {
match := idRegexp.FindStringSubmatch(info)
if match == nil || len(match) < 2 {
continue
}
info, err := fs.readInfo(match[1])
if err != nil {
return nil, err
}
infos = append(infos, info)
}
return infos, nil
}

// PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers
func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) error {
kobergj marked this conversation as resolved.
Show resolved Hide resolved
infos, err := fs.uploadInfos()
if err != nil {
return err
}

for _, info := range infos {
expires, err := strconv.Atoi(info.MetaData["expires"])
if err != nil {
continue
}
if int64(expires) < time.Now().Unix() {
purgedChan <- info
err = os.Remove(info.Storage["BinPath"])
if err != nil {
return err
}
err = os.Remove(filepath.Join(fs.o.Root, "uploads", info.ID+".info"))
if err != nil {
return err
}
}
}
return nil
}

// lookupNode looks up nodes by path.
// This method can also handle lookups for paths which contain chunking information.
func (fs *Decomposedfs) lookupNode(ctx context.Context, spaceRoot *node.Node, path string) (*node.Node, error) {
Expand Down