Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more upload session filters #7933

Merged
merged 5 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions services/storage-users/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,37 @@ When using Infinite Scale as user storage, a directory named `storage/users/uplo

Example cases for expired uploads

* When a user uploads a big file but the file exceeds the user-quota, the upload can't be moved to the target after it has finished. The file stays at the upload location until it is manually cleared.
* In the final step the upload blob is moved from the upload area to the final blobstore (e.g. S3).

* If the bandwidth is limited and the file to transfer can't be transferred completely before the upload expiration time is reached, the file expires and can't be processed.

There are two commands available to manage unfinished uploads
The admin can restart the postprocessing for this with the postprocessing cli.

The storage users service can only list and clean upload sessions:

```bash
ocis storage-users uploads <command>
```

```plaintext
COMMANDS:
list Print a list of all incomplete uploads
clean Clean up leftovers from expired uploads
sessions Print a list of upload sessions
clean Clean up leftovers from expired uploads
list Print a list of all incomplete uploads (deprecated)
```

#### Command Examples

Command to identify incomplete uploads
Command to list ongoing upload sessions

```bash
ocis storage-users uploads list
ocis storage-users sessions --expired=false
```

```plaintext
Incomplete uploads:
- 455bd640-cd08-46e8-a5a0-9304908bd40a (file_example_PPT_1MB.ppt, Size: 1028608, Expires: 2022-08-17T12:35:34+02:00)
Expired sessions:
- b921cccc-bf52-478c-aba0-7d213c4cd3a0 (Space: 068ca28f-5c2c-42c3-87ff-f3e1e94dd429, Name: file.txt, Size: 2/3, Expires: 2024-01-23 14:07:05 +0100 CET, Processing: false)
- 4c510ada-c86b-4815-8820-42cdf82c3d51 (Space: 074f9e32-63f0-48e7-bef6-75a37096b2db, Name: logs_cs3org_reva_3695_14_6.log, Size: 275672/275672, Expires: 2023-12-19 11:43:16 +0100 CET, Processing: true)
```

Command to clear expired uploads
Expand All @@ -74,6 +79,17 @@ Cleaned uploads:
- 455bd640-cd08-46e8-a5a0-9304908bd40a (Filename: file_example_PPT_1MB.ppt, Size: 1028608, Expires: 2022-08-17T12:35:34+02:00)
```

Deprecated list command to identify unfinished uploads

```bash
ocis storage-users uploads list
```

```plaintext
Incomplete uploads:
- 455bd640-cd08-46e8-a5a0-9304908bd40a (file_example_PPT_1MB.ppt, Size: 1028608, Expires: 2022-08-17T12:35:34+02:00)
```

### Purge Expired Space Trash-Bins Items

<!-- referencing: https://github.com/owncloud/ocis/pull/5500 -->
Expand Down
147 changes: 144 additions & 3 deletions services/storage-users/pkg/command/uploads.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package command

import (
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"

"github.com/urfave/cli/v2"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
Expand All @@ -22,6 +26,7 @@ func Uploads(cfg *config.Config) *cli.Command {
Usage: "manage unfinished uploads",
Subcommands: []*cli.Command{
ListUploads(cfg),
ListUploadSessions(cfg),
PurgeExpiredUploads(cfg),
},
}
Expand All @@ -31,7 +36,7 @@ func Uploads(cfg *config.Config) *cli.Command {
func ListUploads(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "list",
Usage: "Print a list of all incomplete uploads",
Usage: "Print a list of all incomplete uploads (deprecated, use sessions)",
kobergj marked this conversation as resolved.
Show resolved Hide resolved
Before: func(c *cli.Context) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Expand All @@ -50,7 +55,7 @@ func ListUploads(cfg *config.Config) *cli.Command {

managingFS, ok := fs.(storage.UploadSessionLister)
if !ok {
fmt.Fprintf(os.Stderr, "'%s' storage does not support listing expired uploads\n", cfg.Driver)
fmt.Fprintf(os.Stderr, "'%s' storage does not support listing upload sessions\n", cfg.Driver)
os.Exit(1)
}
expired := false
Expand All @@ -69,7 +74,143 @@ func ListUploads(cfg *config.Config) *cli.Command {
}
}

// PurgeExpiredUploads is the entry point for the server command.
// ListUploadSessions prints a list of upload sessiens
func ListUploadSessions(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "sessions",
Usage: "Print a list of upload sessions",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "id",
DefaultText: "unset",
Usage: "filter sessions by upload session id",
},
&cli.BoolFlag{
Name: "processing",
DefaultText: "unset",
Usage: "filter sessions by processing status",
},
&cli.BoolFlag{
Name: "expired",
DefaultText: "unset",
Usage: "filter sessions by expired status",
},
&cli.StringFlag{
Name: "output",
Usage: "output format to use (can be 'plain' or 'json', experimental)",
kobergj marked this conversation as resolved.
Show resolved Hide resolved
Value: "plain",
},
},
Before: func(c *cli.Context) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
f, ok := registry.NewFuncs[cfg.Driver]
if !ok {
fmt.Fprintf(os.Stderr, "Unknown filesystem driver '%s'\n", cfg.Driver)
os.Exit(1)
}
drivers := revaconfig.StorageProviderDrivers(cfg)
fs, err := f(drivers[cfg.Driver].(map[string]interface{}), nil)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to initialize filesystem driver '%s'\n", cfg.Driver)
return err
}

managingFS, ok := fs.(storage.UploadSessionLister)
if !ok {
fmt.Fprintf(os.Stderr, "'%s' storage does not support listing upload sessions\n", cfg.Driver)
os.Exit(1)
}
var b strings.Builder
kobergj marked this conversation as resolved.
Show resolved Hide resolved
filter := storage.UploadSessionFilter{}
if c.IsSet("processing") {
processingValue := c.Bool("processing")
filter.Processing = &processingValue
if !processingValue {
b.WriteString("Not ")
}
if b.Len() == 0 {
b.WriteString("Processing ")
} else {
b.WriteString("processing ")
}
}
if c.IsSet("expired") {
expiredValue := c.Bool("expired")
filter.Expired = &expiredValue
if !expiredValue {
if b.Len() == 0 {
b.WriteString("Not ")
} else {
b.WriteString(", not ")
}
}
if b.Len() == 0 {
b.WriteString("Expired ")
} else {
b.WriteString("expired ")
}
}
if b.Len() == 0 {
b.WriteString("Sessions")
} else {
b.WriteString("sessions")
}
if c.IsSet("id") {
idValue := c.String("id")
filter.ID = &idValue
b.WriteString(" with id '" + idValue + "'")
}
b.WriteString(":")
uploads, err := managingFS.ListUploadSessions(c.Context, filter)
if err != nil {
return err
}

asJson := c.String("output") == "json"
if !asJson {
fmt.Println(b.String())
}
for _, u := range uploads {
ref := u.Reference()
if asJson {
s := struct {
ID string `json:"id"`
Space string `json:"space"`
Filename string `json:"filename"`
Offset int64 `json:"offset"`
Size int64 `json:"size"`
Executant userpb.UserId `json:"executant"`
SpaceOwner *userpb.UserId `json:"spaceowner,omitempty"`
Expires time.Time `json:"expires"`
Processing bool `json:"processing"`
}{
Space: ref.GetResourceId().GetSpaceId(),
ID: u.ID(),
Filename: u.Filename(),
Offset: u.Offset(),
Size: u.Size(),
Executant: u.Executant(),
SpaceOwner: u.SpaceOwner(),
Expires: u.Expires(),
Processing: u.IsProcessing(),
}
j, err := json.Marshal(s)
if err != nil {
fmt.Println(err)
}
fmt.Println(string(j))
} else {
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
}
}
return nil
},
}
}

// PurgeExpiredUploads is the entry point for the clean command
func PurgeExpiredUploads(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "clean",
Expand Down