From b11ba0b6b23c42328662ea7f69903fd8a09511c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 12 Jul 2022 10:27:35 +0200 Subject: [PATCH] Add a cli for listing and cleaning up expired uploads Fixes #2622 --- go.mod | 6 +- go.sum | 6 +- services/storage-users/pkg/command/root.go | 1 + services/storage-users/pkg/command/uploads.go | 150 ++++++++++++++++++ services/storage-users/pkg/config/config.go | 1 + .../pkg/config/defaults/defaultconfig.go | 7 +- .../storage-users/pkg/revaconfig/config.go | 1 + 7 files changed, 163 insertions(+), 9 deletions(-) create mode 100644 services/storage-users/pkg/command/uploads.go diff --git a/go.mod b/go.mod index 5a77407adcd..32aaa33c356 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/blevesearch/bleve/v2 v2.3.3 github.com/blevesearch/bleve_index_api v1.0.2 github.com/coreos/go-oidc/v3 v3.2.0 - github.com/cs3org/go-cs3apis v0.0.0-20220711084433-8f71d4e812a3 + github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64 github.com/cs3org/reva/v2 v2.7.2 github.com/disintegration/imaging v1.6.2 github.com/ggwhite/go-masker v1.0.9 @@ -59,6 +59,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/test-go/testify v1.1.4 github.com/thejerf/suture/v4 v4.0.2 + github.com/tus/tusd v1.8.0 github.com/urfave/cli/v2 v2.11.0 go-micro.dev/v4 v4.7.0 go.opencensus.io v0.23.0 @@ -240,7 +241,6 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.4.0 // indirect github.com/trustelem/zxcvbn v1.0.1 // indirect - github.com/tus/tusd v1.8.0 // indirect github.com/wk8/go-ordered-map v0.2.0 // indirect github.com/xanzy/ssh-agent v0.3.1 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect @@ -266,3 +266,5 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect stash.kopano.io/kgol/kcc-go/v5 v5.0.1 // indirect ) + +replace github.com/cs3org/reva/v2 => ../reva diff --git a/go.sum b/go.sum index 6dfc9ff9f58..679ee3074b8 100644 --- a/go.sum +++ b/go.sum @@ -297,10 +297,8 @@ github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3p github.com/crewjam/saml v0.4.6 h1:XCUFPkQSJLvzyl4cW9OvpWUbRf0gE7VUpU8ZnilbeM4= github.com/crewjam/saml v0.4.6/go.mod h1:ZBOXnNPFzB3CgOkRm7Nd6IVdkG+l/wF+0ZXLqD96t1A= github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4= -github.com/cs3org/go-cs3apis v0.0.0-20220711084433-8f71d4e812a3 h1:QSQ2DGKPMChB4vHSs1Os9TnOJl21BrzKX9D5EtQfDog= -github.com/cs3org/go-cs3apis v0.0.0-20220711084433-8f71d4e812a3/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= -github.com/cs3org/reva/v2 v2.7.2 h1:R6+tlAYDZz2ylUvq3KWBDMmsyLOD7yJpe51GzIZpWr0= -github.com/cs3org/reva/v2 v2.7.2/go.mod h1:gbbAUK+8gmyTJ8J+JWwbKbKRlpNRcoMuAXNFN0HbfIE= +github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64 h1:cFnankJOCWndnOns4sKRG7yzH61ammK2Am6rEGWCK40= +github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= diff --git a/services/storage-users/pkg/command/root.go b/services/storage-users/pkg/command/root.go index 5ae9e6e86f5..21c3089b6cd 100644 --- a/services/storage-users/pkg/command/root.go +++ b/services/storage-users/pkg/command/root.go @@ -18,6 +18,7 @@ func GetCommands(cfg *config.Config) cli.Commands { Server(cfg), // interaction with this service + Uploads(cfg), // infos about this service Health(cfg), diff --git a/services/storage-users/pkg/command/uploads.go b/services/storage-users/pkg/command/uploads.go new file mode 100644 index 00000000000..0aa7174c348 --- /dev/null +++ b/services/storage-users/pkg/command/uploads.go @@ -0,0 +1,150 @@ +package command + +import ( + "fmt" + "os" + "strconv" + "sync" + "time" + + tusd "github.com/tus/tusd/pkg/handler" + "github.com/urfave/cli/v2" + + "github.com/cs3org/reva/v2/pkg/storage" + "github.com/cs3org/reva/v2/pkg/storage/fs/registry" + "github.com/owncloud/ocis/v2/services/storage-users/pkg/config" + "github.com/owncloud/ocis/v2/services/storage-users/pkg/config/parser" + "github.com/owncloud/ocis/v2/services/storage-users/pkg/revaconfig" +) + +func Uploads(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "uploads", + Usage: "manage uploads", + Before: func(c *cli.Context) error { + if err := parser.ParseConfig(cfg); err != nil { + fmt.Printf("%v", err) + return err + } + return nil + }, + Subcommands: []*cli.Command{ + ListUploads(cfg), + PurgeExpiredUploads(cfg), + }, + } +} + +// ListUploads prints a list of all incomplete uploads +func ListUploads(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "list", + Usage: fmt.Sprintf("Print a list of all incomplete uploads"), + Category: "services", + Before: func(c *cli.Context) error { + err := parser.ParseConfig(cfg) + if err != nil { + fmt.Printf("%v", err) + os.Exit(1) + } + return err + }, + Action: func(c *cli.Context) error { + f, ok := registry.NewFuncs[cfg.Driver] + if !ok { + fmt.Fprintf(os.Stderr, "Unknown filesystem driver '%s'\n", cfg.Driver) + os.Exit(1) + } + drivers := revaconfig.UserDrivers(cfg) + fs, err := f(drivers[cfg.Driver].(map[string]interface{})) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to initialize filesystem driver '%s'\n", cfg.Driver) + return err + } + + managingFS, ok := fs.(storage.UploadsManager) + if !ok { + fmt.Fprintf(os.Stderr, "'%s' storage does not support listing expired uploads\n", cfg.Driver) + os.Exit(1) + } + + uploads, err := managingFS.ListUploads() + if err != nil { + return err + } + + fmt.Println("Incomplete uploads:") + for _, u := range uploads { + fmt.Printf(" - %s (%s, Size: %d, Expires: %s)\n", u.ID, u.MetaData["filename"], u.Size, expiredString(u.MetaData["expires"])) + } + return nil + }, + } +} + +// PurgeExpiredUploads is the entry point for the server command. +func PurgeExpiredUploads(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "purge", + Usage: fmt.Sprintf("Let %s extension clean up leftovers from expired downloads", cfg.Service.Name), + Category: "services", + Before: func(c *cli.Context) error { + err := parser.ParseConfig(cfg) + if err != nil { + fmt.Printf("%v", err) + os.Exit(1) + } + return err + }, + Action: func(c *cli.Context) error { + f, ok := registry.NewFuncs[cfg.Driver] + if !ok { + fmt.Fprintf(os.Stderr, "Unknown filesystem driver '%s'\n", cfg.Driver) + os.Exit(1) + } + drivers := revaconfig.UserDrivers(cfg) + fs, err := f(drivers[cfg.Driver].(map[string]interface{})) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to initialize filesystem driver '%s'\n", cfg.Driver) + return err + } + + managingFS, ok := fs.(storage.UploadsManager) + if !ok { + fmt.Fprintf(os.Stderr, "'%s' storage does not support purging expired uploads\n", cfg.Driver) + os.Exit(1) + } + + wg := sync.WaitGroup{} + wg.Add(1) + purgedChannel := make(chan tusd.FileInfo) + + go func() { + for purged := range purgedChannel { + fmt.Printf("Purging %s (Filename: %s, Size: %d, Expires: %s)\n", + purged.ID, purged.MetaData["filename"], purged.Size, expiredString(purged.MetaData["expires"])) + + } + wg.Done() + }() + + err = managingFS.PurgeExpiredUploads(purgedChannel) + close(purgedChannel) + wg.Wait() + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to purge expired uploads '%s'\n", err) + return err + } + return nil + }, + } +} + +func expiredString(e string) string { + expired := "N/A" + iExpires, err := strconv.Atoi(e) + if err == nil { + expired = time.Unix(int64(iExpires), 0).Format(time.RFC3339) + } + return expired +} diff --git a/services/storage-users/pkg/config/config.go b/services/storage-users/pkg/config/config.go index f13122af927..fe9128e0ddd 100644 --- a/services/storage-users/pkg/config/config.go +++ b/services/storage-users/pkg/config/config.go @@ -28,6 +28,7 @@ type Config struct { MountID string `yaml:"mount_id" env:"STORAGE_USERS_MOUNT_ID" desc:"Mount ID of this storage."` ExposeDataServer bool `yaml:"expose_data_server" env:"STORAGE_USERS_EXPOSE_DATA_SERVER" desc:"Exposes the data server directly to users and bypasses the data gateway. Ensure that the data server address is reachable by users."` ReadOnly bool `yaml:"readonly" env:"STORAGE_USERS_READ_ONLY" desc:"Set this storage to be read-only."` + UploadExpiration int64 `yaml:"upload_expiration" env:"STORAGE_USERS_UPLOAD_EXPIRATION" desc:"Duration after which uploads will expire."` Supervised bool `yaml:"-"` Context context.Context `yaml:"-"` diff --git a/services/storage-users/pkg/config/defaults/defaultconfig.go b/services/storage-users/pkg/config/defaults/defaultconfig.go index e792c92077a..6448a8b82aa 100644 --- a/services/storage-users/pkg/config/defaults/defaultconfig.go +++ b/services/storage-users/pkg/config/defaults/defaultconfig.go @@ -39,9 +39,10 @@ func DefaultConfig() *config.Config { Reva: &config.Reva{ Address: "127.0.0.1:9142", }, - DataServerURL: "http://localhost:9158/data", - MountID: "1284d238-aa92-42ce-bdc4-0b0000009157", - Driver: "ocis", + DataServerURL: "http://localhost:9158/data", + MountID: "1284d238-aa92-42ce-bdc4-0b0000009157", + UploadExpiration: 24 * 60 * 60, + Driver: "ocis", Drivers: config.Drivers{ OwnCloudSQL: config.OwnCloudSQLDriver{ Root: filepath.Join(defaults.BaseDataPath(), "storage", "owncloud"), diff --git a/services/storage-users/pkg/revaconfig/config.go b/services/storage-users/pkg/revaconfig/config.go index 21ce7f2e736..c6a2a713eb4 100644 --- a/services/storage-users/pkg/revaconfig/config.go +++ b/services/storage-users/pkg/revaconfig/config.go @@ -29,6 +29,7 @@ func StorageUsersConfigFromStruct(cfg *config.Config) map[string]interface{} { "mount_id": cfg.MountID, "expose_data_server": cfg.ExposeDataServer, "data_server_url": cfg.DataServerURL, + "upload_expiration": cfg.UploadExpiration, }, }, "interceptors": map[string]interface{}{