Skip to content

Commit

Permalink
harden uploads (cs3org#3899)
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic authored and 2403905 committed May 19, 2023
1 parent df4217b commit 8d4f0f4
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 20 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/harden-uploads.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: harden uploads

Uploads now check response headers for a file id and omit a subsequent stat request which might land on a storage provider that does not yet see the new file due to latency, eg. when NFS caches direntries.

https://github.com/cs3org/reva/pull/3899
35 changes: 21 additions & 14 deletions internal/http/services/appprovider/appprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/internal/http/services/datagateway"
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/rgrpc/status"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
Expand Down Expand Up @@ -283,26 +284,32 @@ func (s *svc) handleNew(w http.ResponseWriter, r *http.Request) {
return
}

// Stat the newly created file
statRes, err := client.Stat(ctx, statFileReq)
if err != nil {
writeError(w, r, appErrorServerError, "statting the created file failed", err)
return
}
var fileid string
if httpRes.Header.Get(net.HeaderOCFileID) != "" {
fileid = httpRes.Header.Get(net.HeaderOCFileID)
} else {
// Stat the newly created file
statRes, err := client.Stat(ctx, statFileReq)
if err != nil {
writeError(w, r, appErrorServerError, "statting the created file failed", err)
return
}

if statRes.Status.Code != rpc.Code_CODE_OK {
writeError(w, r, appErrorServerError, "statting the created file failed", nil)
return
}
if statRes.Status.Code != rpc.Code_CODE_OK {
writeError(w, r, appErrorServerError, "statting the created file failed", nil)
return
}

if statRes.Info.Type != provider.ResourceType_RESOURCE_TYPE_FILE {
writeError(w, r, appErrorInvalidParameter, "the given file id does not point to a file", nil)
return
if statRes.Info.Type != provider.ResourceType_RESOURCE_TYPE_FILE {
writeError(w, r, appErrorInvalidParameter, "the given file id does not point to a file", nil)
return
}
fileid = storagespace.FormatResourceID(*statRes.Info.Id)
}

js, err := json.Marshal(
map[string]interface{}{
"file_id": storagespace.FormatResourceID(*statRes.Info.Id),
"file_id": fileid,
},
)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions internal/http/services/owncloud/ocdav/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@ func (s *svc) handleTusPost(ctx context.Context, w http.ResponseWriter, r *http.
if length == 0 || httpRes.Header.Get(net.HeaderUploadOffset) == r.Header.Get(net.HeaderUploadLength) {
// get uploaded file metadata

if resid, err := storagespace.ParseID(httpRes.Header.Get(net.HeaderOCFileID)); err == nil {
sReq.Ref = &provider.Reference{
ResourceId: &resid,
}
}

sRes, err := s.gwClient.Stat(ctx, sReq)
if err != nil {
log.Error().Err(err).Msg("error sending grpc stat request")
Expand Down
12 changes: 10 additions & 2 deletions pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/cache"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/mitchellh/mapstructure"
)
Expand Down Expand Up @@ -144,6 +145,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
metrics.UploadsActive.Sub(1)
}()
// set etag, mtime and file id
setHeaders(fs, w, r)
handler.PostFile(w, r)
case "HEAD":
handler.HeadFile(w, r)
Expand All @@ -153,7 +155,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
metrics.UploadsActive.Sub(1)
}()
// set etag, mtime and file id
setExpiresHeader(fs, w, r)
setHeaders(fs, w, r)
handler.PatchFile(w, r)
case "DELETE":
handler.DelFile(w, r)
Expand All @@ -180,7 +182,7 @@ type composable interface {
UseIn(composer *tusd.StoreComposer)
}

func setExpiresHeader(fs storage.FS, w http.ResponseWriter, r *http.Request) {
func setHeaders(fs storage.FS, w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id := path.Base(r.URL.Path)
datastore, ok := fs.(tusd.DataStore)
Expand All @@ -202,4 +204,10 @@ func setExpiresHeader(fs storage.FS, w http.ResponseWriter, r *http.Request) {
if expires != "" {
w.Header().Set(net.HeaderTusUploadExpires, expires)
}
resourceid := provider.ResourceId{
StorageId: info.MetaData["providerID"],
SpaceId: info.Storage["SpaceRoot"],
OpaqueId: info.Storage["NodeId"],
}
w.Header().Set(net.HeaderOCFileID, storagespace.FormatResourceID(resourceid))
}
12 changes: 8 additions & 4 deletions pkg/storage/utils/decomposedfs/upload/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p
"BinPath": binPath,

"NodeId": n.ID,
"NodeExists": "true",
"NodeParentId": n.ParentID,
"NodeName": n.Name,
"SpaceRoot": spaceRoot,
Expand All @@ -166,6 +167,11 @@ func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p

"LogLevel": log.GetLevel().String(),
}
if !n.Exists {
// fill future node info
info.Storage["NodeId"] = uuid.New().String()
info.Storage["NodeExists"] = "false"
}
// Create binary file in the upload folder with no content
log.Debug().Interface("info", info).Msg("Decomposedfs: built storage info")
file, err := os.OpenFile(binPath, os.O_CREATE|os.O_WRONLY, defaultFilePerm)
Expand Down Expand Up @@ -267,8 +273,8 @@ func CreateNodeForUpload(upload *Upload, initAttrs node.Attributes) (*node.Node,
}

var f *lockedfile.File
switch n.ID {
case "":
switch upload.Info.Storage["NodeExists"] {
case "false":
f, err = initNewNode(upload, n, uint64(fsize))
default:
f, err = updateExistingNode(upload, n, spaceID, uint64(fsize))
Expand Down Expand Up @@ -320,8 +326,6 @@ func CreateNodeForUpload(upload *Upload, initAttrs node.Attributes) (*node.Node,
}

func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*lockedfile.File, error) {
n.ID = uuid.New().String()

// create folder structure (if needed)
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
return nil, err
Expand Down

0 comments on commit 8d4f0f4

Please sign in to comment.