Skip to content

Commit

Permalink
Merge pull request #3095 from aduffeck/cleanup-uploads
Browse files Browse the repository at this point in the history
Cleanup uploads
  • Loading branch information
aduffeck authored Aug 3, 2022
2 parents 13a6567 + 0855632 commit 28724e4
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 16 deletions.
9 changes: 9 additions & 0 deletions changelog/unreleased/upload-expiration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Enhancement: Upload expiration and cleanup

We made storage providers aware of upload expiration and added an interface
for FS which support listing and purging expired uploads.

We also implemented said interface for decomposedfs.

https://github.com/cs3org/reva/pull/3095
https://github.com/owncloud/ocis/pull/4256
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/cheggaaa/pb v1.0.29
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e
github.com/cs3org/go-cs3apis v0.0.0-20220711084433-8f71d4e812a3
github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8
github.com/dgraph-io/ristretto v0.1.0
github.com/eventials/go-tus v0.0.0-20220610120217-05d0564bb571
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJffz4pz0o1WuQxJ28+5x5JgaHD8=
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4=
github.com/cs3org/go-cs3apis v0.0.0-20220711084433-8f71d4e812a3 h1:QSQ2DGKPMChB4vHSs1Os9TnOJl21BrzKX9D5EtQfDog=
github.com/cs3org/go-cs3apis v0.0.0-20220711084433-8f71d4e812a3/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64 h1:cFnankJOCWndnOns4sKRG7yzH61ammK2Am6rEGWCK40=
github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
14 changes: 9 additions & 5 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,12 @@ type transferClaims struct {
Target string `json:"target"`
}

func (s *svc) sign(_ context.Context, target string) (string, error) {
func (s *svc) sign(_ context.Context, target string, expiresAt int64) (string, error) {
// Tus sends a separate request to the datagateway service for every chunk.
// For large files, this can take a long time, so we extend the expiration
ttl := time.Duration(s.c.TransferExpires) * time.Second
claims := transferClaims{
StandardClaims: jwt.StandardClaims{
ExpiresAt: time.Now().Add(ttl).Unix(),
ExpiresAt: expiresAt,
Audience: "reva",
IssuedAt: time.Now().Unix(),
},
Expand Down Expand Up @@ -510,7 +509,7 @@ func (s *svc) InitiateFileDownload(ctx context.Context, req *provider.InitiateFi

// TODO(labkode): calculate signature of the whole request? we only sign the URI now. Maybe worth https://tools.ietf.org/html/draft-cavage-http-signatures-11
target := u.String()
token, err := s.sign(ctx, target)
token, err := s.sign(ctx, target, time.Now().UTC().Add(time.Duration(s.c.TransferExpires)*time.Second).Unix())
if err != nil {
return &gateway.InitiateFileDownloadResponse{
Status: status.NewStatusFromErrType(ctx, "error creating signature for download", err),
Expand Down Expand Up @@ -572,7 +571,12 @@ func (s *svc) InitiateFileUpload(ctx context.Context, req *provider.InitiateFile

// TODO(labkode): calculate signature of the whole request? we only sign the URI now. Maybe worth https://tools.ietf.org/html/draft-cavage-http-signatures-11
target := u.String()
token, err := s.sign(ctx, target)
ttl := time.Duration(s.c.TransferExpires) * time.Second
expiresAt := time.Now().Add(ttl).Unix()
if storageRes.Protocols[p].Expiration != nil {
expiresAt = utils.TSToTime(storageRes.Protocols[p].Expiration).Unix()
}
token, err := s.sign(ctx, target, expiresAt)
if err != nil {
return &gateway.InitiateFileUploadResponse{
Status: status.NewStatusFromErrType(ctx, "error creating signature for upload", err),
Expand Down
10 changes: 10 additions & 0 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"path"
"sort"
"strconv"
"time"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand Down Expand Up @@ -65,6 +66,7 @@ type config struct {
AvailableXS map[string]uint32 `mapstructure:"available_checksums" docs:"nil;List of available checksums."`
CustomMimeTypesJSON string `mapstructure:"custom_mimetypes_json" docs:"nil;An optional mapping file with the list of supported custom file extensions and corresponding mime types."`
MountID string `mapstructure:"mount_id"`
UploadExpiration int64 `mapstructure:"upload_expiration" docs:"0;Duration for how long uploads will be valid."`
}

func (c *config) init() {
Expand Down Expand Up @@ -347,6 +349,13 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate

// pass on the provider it to be persisted with the upload info. that is required to correlate the upload with the proper provider later on
metadata["providerID"] = s.conf.MountID
var expirationTimestamp *typesv1beta1.Timestamp
if s.conf.UploadExpiration > 0 {
expirationTimestamp = &typesv1beta1.Timestamp{
Seconds: uint64(time.Now().UTC().Add(time.Duration(s.conf.UploadExpiration) * time.Second).Unix()),
}
metadata["expires"] = strconv.Itoa(int(expirationTimestamp.Seconds))
}

uploadIDs, err := s.storage.InitiateUpload(ctx, req.Ref, uploadLength, metadata)
if err != nil {
Expand Down Expand Up @@ -393,6 +402,7 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
UploadEndpoint: u.String(),
AvailableChecksums: s.availableXS,
Expose: s.conf.ExposeDataServer,
Expiration: expirationTimestamp,
}
i++
log.Info().Str("data-server", u.String()).
Expand Down
27 changes: 27 additions & 0 deletions pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ package tus
import (
"context"
"net/http"
"path"
"path/filepath"

"github.com/pkg/errors"
tusd "github.com/tus/tusd/pkg/handler"

userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
Expand Down Expand Up @@ -135,6 +137,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
case "HEAD":
handler.HeadFile(w, r)
case "PATCH":
setExpiresHeader(fs, w, r)
handler.PatchFile(w, r)
case "DELETE":
handler.DelFile(w, r)
Expand All @@ -153,3 +156,27 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
type composable interface {
UseIn(composer *tusd.StoreComposer)
}

func setExpiresHeader(fs storage.FS, w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
id := path.Base(r.URL.Path)
datastore, ok := fs.(tusd.DataStore)
if !ok {
appctx.GetLogger(ctx).Error().Interface("fs", fs).Msg("storage is not a tus datastore")
return
}
upload, err := datastore.GetUpload(context.Background(), id)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("could not get upload from storage")
return
}
info, err := upload.GetInfo(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("could not get upload info for upload")
return
}
expires := info.MetaData["expires"]
if expires != "" {
w.Header().Set(net.HeaderTusUploadExpires, expires)
}
}
8 changes: 8 additions & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"io"
"net/url"

tusd "github.com/tus/tusd/pkg/handler"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1"
Expand Down Expand Up @@ -75,6 +77,12 @@ type FS interface {
DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) error
}

// UploadsManager defines the interface for FS implementations that allow for managing uploads
type UploadsManager interface {
ListUploads() ([]tusd.FileInfo, error)
PurgeExpiredUploads(chan<- tusd.FileInfo) error
}

// Registry is the interface that storage registries implement
// for discovering storage providers
type Registry interface {
Expand Down
84 changes: 76 additions & 8 deletions pkg/storage/utils/decomposedfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ import (
"io/ioutil"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"

tusd "github.com/tus/tusd/pkg/handler"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
Expand All @@ -50,7 +54,6 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog"
tusd "github.com/tus/tusd/pkg/handler"
)

var defaultFilePerm = os.FileMode(0664)
Expand Down Expand Up @@ -154,6 +157,9 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere
if mtime, ok := metadata["mtime"]; ok {
info.MetaData["mtime"] = mtime
}
if expiration, ok := metadata["expires"]; ok {
info.MetaData["expires"] = expiration
}
if _, ok := metadata["sizedeferred"]; ok {
info.SizeIsDeferred = true
}
Expand Down Expand Up @@ -335,8 +341,7 @@ func (fs *Decomposedfs) getUploadPath(ctx context.Context, uploadID string) (str
return filepath.Join(fs.o.Root, "uploads", uploadID), nil
}

// GetUpload returns the Upload for the given upload id
func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) {
func (fs *Decomposedfs) readInfo(id string) (tusd.FileInfo, error) {
infoPath := filepath.Join(fs.o.Root, "uploads", id+".info")

info := tusd.FileInfo{}
Expand All @@ -346,19 +351,28 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload,
// Interpret os.ErrNotExist as 404 Not Found
err = tusd.ErrNotFound
}
return nil, err
return tusd.FileInfo{}, err
}
if err := json.Unmarshal(data, &info); err != nil {
return nil, err
return tusd.FileInfo{}, err
}

stat, err := os.Stat(info.Storage["BinPath"])
if err != nil {
return nil, err
return tusd.FileInfo{}, err
}

info.Offset = stat.Size()

return info, nil
}

// GetUpload returns the Upload for the given upload id
func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) {
info, err := fs.readInfo(id)
if err != nil {
return nil, err
}

u := &userpb.User{
Id: &userpb.UserId{
Idp: info.Storage["Idp"],
Expand All @@ -383,12 +397,66 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload,
return &fileUpload{
info: info,
binPath: info.Storage["BinPath"],
infoPath: infoPath,
infoPath: filepath.Join(fs.o.Root, "uploads", id+".info"),
fs: fs,
ctx: ctx,
}, nil
}

// ListUploads returns a list of all incomplete uploads
func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) {
return fs.uploadInfos()
}

func (fs *Decomposedfs) uploadInfos() ([]tusd.FileInfo, error) {
infos := []tusd.FileInfo{}
infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info"))
if err != nil {
return nil, err
}

idRegexp := regexp.MustCompile(".*/([^/]+).info")
for _, info := range infoFiles {
match := idRegexp.FindStringSubmatch(info)
if match == nil || len(match) < 2 {
continue
}
info, err := fs.readInfo(match[1])
if err != nil {
return nil, err
}
infos = append(infos, info)
}
return infos, nil
}

// PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers
func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) error {
infos, err := fs.uploadInfos()
if err != nil {
return err
}

for _, info := range infos {
expires, err := strconv.Atoi(info.MetaData["expires"])
if err != nil {
continue
}
if int64(expires) < time.Now().Unix() {
purgedChan <- info
err = os.Remove(info.Storage["BinPath"])
if err != nil {
return err
}
err = os.Remove(filepath.Join(fs.o.Root, "uploads", info.ID+".info"))
if err != nil {
return err
}
}
}
return nil
}

// lookupNode looks up nodes by path.
// This method can also handle lookups for paths which contain chunking information.
func (fs *Decomposedfs) lookupNode(ctx context.Context, spaceRoot *node.Node, path string) (*node.Node, error) {
Expand Down

0 comments on commit 28724e4

Please sign in to comment.