From 4e264b14133bbb846a2c9d68bd6ddea0d4b50df8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Mon, 11 Dec 2023 15:36:00 +0100 Subject: [PATCH] add more upload session filters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- services/storage-users/pkg/command/uploads.go | 145 +++++++++++++++++- 1 file changed, 143 insertions(+), 2 deletions(-) diff --git a/services/storage-users/pkg/command/uploads.go b/services/storage-users/pkg/command/uploads.go index 8217a669c4a..45d40150059 100644 --- a/services/storage-users/pkg/command/uploads.go +++ b/services/storage-users/pkg/command/uploads.go @@ -1,12 +1,16 @@ package command import ( + "encoding/json" "fmt" "os" + "strings" "sync" + "time" "github.com/urfave/cli/v2" + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" @@ -22,6 +26,7 @@ func Uploads(cfg *config.Config) *cli.Command { Usage: "manage unfinished uploads", Subcommands: []*cli.Command{ ListUploads(cfg), + ListUploadSessions(cfg), PurgeExpiredUploads(cfg), }, } @@ -31,7 +36,7 @@ func Uploads(cfg *config.Config) *cli.Command { func ListUploads(cfg *config.Config) *cli.Command { return &cli.Command{ Name: "list", - Usage: "Print a list of all incomplete uploads", + Usage: "Print a list of all incomplete uploads (deprecated, use sessions)", Before: func(c *cli.Context) error { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, @@ -50,7 +55,7 @@ func ListUploads(cfg *config.Config) *cli.Command { managingFS, ok := fs.(storage.UploadSessionLister) if !ok { - fmt.Fprintf(os.Stderr, "'%s' storage does not support listing expired uploads\n", cfg.Driver) + fmt.Fprintf(os.Stderr, "'%s' storage does not support listing upload sessions\n", cfg.Driver) os.Exit(1) } falseValue := false @@ -69,6 +74,142 @@ func ListUploads(cfg *config.Config) *cli.Command { } } +// ListUploadSessions prints a list of upload sessiens +func ListUploadSessions(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "sessions", + Usage: "Print a list of upload sessions", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "id", + Usage: "filter sessions by upload session id", + }, + &cli.BoolFlag{ + Name: "processing", + DisableDefaultText: true, + Usage: "filter sessions by processing status", + }, + &cli.BoolFlag{ + Name: "expired", + DisableDefaultText: true, + Usage: "filter sessions by expired status", + }, + &cli.StringFlag{ + Name: "output", + Usage: "output format to use (can be 'plain' or 'json', experimental)", + Value: "plain", + DefaultText: "plain", + }, + }, + Before: func(c *cli.Context) error { + return configlog.ReturnFatal(parser.ParseConfig(cfg)) + }, + Action: func(c *cli.Context) error { + f, ok := registry.NewFuncs[cfg.Driver] + if !ok { + fmt.Fprintf(os.Stderr, "Unknown filesystem driver '%s'\n", cfg.Driver) + os.Exit(1) + } + drivers := revaconfig.StorageProviderDrivers(cfg) + fs, err := f(drivers[cfg.Driver].(map[string]interface{}), nil) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to initialize filesystem driver '%s'\n", cfg.Driver) + return err + } + + managingFS, ok := fs.(storage.UploadSessionLister) + if !ok { + fmt.Fprintf(os.Stderr, "'%s' storage does not support listing upload sessions\n", cfg.Driver) + os.Exit(1) + } + var b strings.Builder + filter := storage.UploadSessionFilter{} + if c.IsSet("processing") { + processingValue := c.Bool("processing") + filter.Processing = &processingValue + if !processingValue { + b.WriteString("Not ") + } + if b.Len() == 0 { + b.WriteString("Processing ") + } else { + b.WriteString("processing ") + } + } + if c.IsSet("expired") { + expiredValue := c.Bool("expired") + filter.Expired = &expiredValue + if !expiredValue { + if b.Len() == 0 { + b.WriteString("Not ") + } else { + b.WriteString(", not ") + } + } + if b.Len() == 0 { + b.WriteString("Expired ") + } else { + b.WriteString("expired ") + } + } + if b.Len() == 0 { + b.WriteString("Sessions") + } else { + b.WriteString("sessions") + } + if c.IsSet("id") { + idValue := c.String("id") + filter.ID = &idValue + b.WriteString(" with id '" + idValue + "'") + } + b.WriteString(":") + uploads, err := managingFS.ListUploadSessions(c.Context, filter) + if err != nil { + return err + } + + asJson := c.String("output") == "json" + if !asJson { + fmt.Println(b.String()) + } + for _, u := range uploads { + ref := u.Reference() + if asJson { + s := struct { + ID string `json:"id"` + Space string `json:"space"` + Filename string `json:"filename"` + Offset int64 `json:"offset"` + Size int64 `json:"size"` + Executant userpb.UserId `json:"executant"` + SpaceOwner *userpb.UserId `json:"spaceowner,omitempty"` + Expires time.Time `json:"expires"` + Processing bool `json:"processing"` + }{ + Space: ref.GetResourceId().GetSpaceId(), + ID: u.ID(), + Filename: u.Filename(), + Offset: u.Offset(), + Size: u.Size(), + Executant: u.Executant(), + SpaceOwner: u.SpaceOwner(), + Expires: u.Expires(), + Processing: u.IsProcessing(), + } + j, err := json.Marshal(s) + if err != nil { + fmt.Println(err) + } + fmt.Println(string(j)) + } else { + fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing()) + } + } + return nil + }, + } +} + // PurgeExpiredUploads is the entry point for the server command. func PurgeExpiredUploads(cfg *config.Config) *cli.Command { return &cli.Command{