Skip to content

Commit

Permalink
change Download signature
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Nov 11, 2024
1 parent adabb5a commit 372b288
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 103 deletions.
82 changes: 38 additions & 44 deletions pkg/rhttp/datatx/utils/download/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,40 +92,47 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
// TODO check preconditions like If-Range, If-Match ...

var md *provider.ResourceInfo
var getContent func(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error)
var reader io.ReadCloser
var err error
var notModified bool

// do a stat to set Content-Length and etag headers

if fscd, ok := fs.(storage.ConsistentDownloader); ok {
md, getContent, err = fscd.ConsistentDownload(ctx, ref)
if err != nil {
handleError(w, &sublog, err, "download")
return
md, reader, err = fs.Download(ctx, ref, func(md *provider.ResourceInfo) bool {
// range requests always need to open the reader to check if it is seekable
if r.Header.Get("Range") != "" {
return true
}
} else {
// There is a race condition here: between the stat and the download the file could have been updated again
// this is ok if the HeaderIfNoneMatch header is set but we should then also update the etag
// maybe we can read the etag from the download response headers?
if md, err = fs.GetMD(ctx, ref, nil, []string{"size", "mimetype", "etag"}); err != nil {
handleError(w, &sublog, err, "stat")
return
// otherwise, HEAD requests do not need to open a reader
if r.Method == "HEAD" {
return false
}
getContent = fs.Download
}

// check etag, see https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match
for _, etag := range r.Header.Values(net.HeaderIfNoneMatch) {
if md.Etag == etag {
// When the condition fails for GET and HEAD methods, then the server must return
// HTTP status code 304 (Not Modified). [...] Note that the server generating a
// 304 response MUST generate any of the following header fields that would have
// been sent in a 200 (OK) response to the same request:
// Cache-Control, Content-Location, Date, ETag, Expires, and Vary.
w.Header().Set(net.HeaderETag, md.Etag)
w.WriteHeader(http.StatusNotModified)
return
// check etag, see https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match
for _, etag := range r.Header.Values(net.HeaderIfNoneMatch) {
if md.Etag == etag {
// When the condition fails for GET and HEAD methods, then the server must return
// HTTP status code 304 (Not Modified). [...] Note that the server generating a
// 304 response MUST generate any of the following header fields that would have
// been sent in a 200 (OK) response to the same request:
// Cache-Control, Content-Location, Date, ETag, Expires, and Vary.
notModified = true
return false
}
}
return true
})
if err != nil {
handleError(w, &sublog, err, "download")
return
}
if reader != nil {
defer reader.Close()
}
if notModified {
w.Header().Set(net.HeaderETag, md.Etag)
w.WriteHeader(http.StatusNotModified)
return
}

// fill in storage provider id if it is missing
Expand Down Expand Up @@ -155,23 +162,10 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
}
}

// There is a race condition here: between the stat and the download the file could have been updated again
// this is ok if the HeaderIfNoneMatch header is set but we should then also update the etag
// maybe we can read the etag from the download response headers?
ctx = ContextWithEtag(ctx, md.Etag)
content, err := getContent(ctx, ref)
if err != nil {
handleError(w, &sublog, err, "download")
return
}
defer content.Close()

code := http.StatusOK
sendSize := int64(md.Size)
var sendContent io.Reader = content

var s io.Seeker
if s, ok = content.(io.Seeker); ok {
if s, ok = reader.(io.Seeker); ok {
// tell clients they can send range requests
w.Header().Set("Accept-Ranges", "bytes")
}
Expand Down Expand Up @@ -215,7 +209,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
pr, pw := io.Pipe()
mw := multipart.NewWriter(pw)
w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
sendContent = pr
reader = pr
defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
go func() {
for _, ra := range ranges {
Expand All @@ -228,7 +222,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
_ = pw.CloseWithError(err) // CloseWithError always returns nil
return
}
if _, err := io.CopyN(part, content, ra.Length); err != nil {
if _, err := io.CopyN(part, reader, ra.Length); err != nil {
_ = pw.CloseWithError(err) // CloseWithError always returns nil
return
}
Expand All @@ -243,7 +237,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
w.Header().Set(net.HeaderContentLength, strconv.FormatInt(sendSize, 10))
}

w.Header().Set(net.HeaderContentDisposistion, net.ContentDispositionAttachment(path.Base(md.Path)))
w.Header().Set(net.HeaderContentDisposistion, net.ContentDispositionAttachment(md.Name))
w.Header().Set(net.HeaderETag, md.Etag)
w.Header().Set(net.HeaderOCFileID, storagespace.FormatResourceID(md.Id))
w.Header().Set(net.HeaderOCETag, md.Etag)
Expand All @@ -257,7 +251,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI

if r.Method != "HEAD" {
var c int64
c, err = io.CopyN(w, sendContent, sendSize)
c, err = io.CopyN(w, reader, sendSize)
if err != nil {
sublog.Error().Err(err).Interface("resourceid", md.Id).Msg("error copying data to response")
return
Expand Down
9 changes: 2 additions & 7 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type FS interface {
// ListFolder returns the resource infos for all children of the referenced resource
ListFolder(ctx context.Context, ref *provider.Reference, mdKeys, fieldMask []string) ([]*provider.ResourceInfo, error)
// Download returns a ReadCloser for the content of the referenced resource
Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error)
Download(ctx context.Context, ref *provider.Reference, openReaderfunc func(*provider.ResourceInfo) bool) (*provider.ResourceInfo, io.ReadCloser, error)

// GetPathByID returns the path for the given resource id relative to the space root
// It should only reveal the path visible to the current user to not leak the names uf unshared parent resources
Expand Down Expand Up @@ -79,7 +79,7 @@ type FS interface {
// ListRevisions lists all revisions for the referenced resource
ListRevisions(ctx context.Context, ref *provider.Reference) ([]*provider.FileVersion, error)
// DownloadRevision downloads a revision
DownloadRevision(ctx context.Context, ref *provider.Reference, key string) (io.ReadCloser, error)
DownloadRevision(ctx context.Context, ref *provider.Reference, key string, openReaderFunc func(md *provider.ResourceInfo) bool) (*provider.ResourceInfo, io.ReadCloser, error)
// RestoreRevision restores a revision
RestoreRevision(ctx context.Context, ref *provider.Reference, key string) error

Expand Down Expand Up @@ -168,8 +168,3 @@ type PathWrapper interface {
Unwrap(ctx context.Context, rp string) (string, error)
Wrap(ctx context.Context, rp string) (string, error)
}

type ConsistentDownloader interface {
// ConsistentDownload returns the metadata for a resource and a callback to get the content stream matching the etag
ConsistentDownload(ctx context.Context, ref *provider.Reference) (*provider.ResourceInfo, func(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error), error)
}
39 changes: 17 additions & 22 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,7 @@ func (fs *Decomposedfs) ConsistentDownload(ctx context.Context, ref *provider.Re
if currentEtag != expectedEtag {
return nil, nil, errtypes.Aborted(fmt.Sprintf("file changed from etag %s to %s", expectedEtag, currentEtag))
}
md, err := n.AsResourceInfo(ctx, rp, []string{}, []string{}, true)
md, err := n.AsResourceInfo(ctx, rp, nil, []string{"id", "name", "size", "mtime", "mimetype", "etag", "checksum"}, true)

return md, func(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) {
reader, err := fs.tp.ReadBlob(n)
Expand All @@ -1111,54 +1111,49 @@ func (fs *Decomposedfs) ConsistentDownload(ctx context.Context, ref *provider.Re
}

// Download returns a reader to the specified resource
func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) {
func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference, openReaderFunc func(md *provider.ResourceInfo) bool) (*provider.ResourceInfo, io.ReadCloser, error) {
ctx, span := tracer.Start(ctx, "Download")
defer span.End()
// check if we are trying to download a revision
// TODO the CS3 api should allow initiating a revision download
if ref.ResourceId != nil && strings.Contains(ref.ResourceId.OpaqueId, node.RevisionIDDelimiter) {
return fs.DownloadRevision(ctx, ref, ref.ResourceId.OpaqueId)
return fs.DownloadRevision(ctx, ref, ref.ResourceId.OpaqueId, openReaderFunc)
}

n, err := fs.lu.NodeFromResource(ctx, ref)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error resolving ref")
return nil, nil, errors.Wrap(err, "Decomposedfs: error resolving ref")
}

if !n.Exists {
err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name))
return nil, err
return nil, nil, err
}

rp, err := fs.p.AssemblePermissions(ctx, n)
switch {
case err != nil:
return nil, err
return nil, nil, err
case !rp.InitiateFileDownload:
f, _ := storagespace.FormatReference(ref)
if rp.Stat {
return nil, errtypes.PermissionDenied(f)
return nil, nil, errtypes.PermissionDenied(f)
}
return nil, errtypes.NotFound(f)
return nil, nil, errtypes.NotFound(f)
}

mtime, err := n.GetMTime(ctx)
ri, err := n.AsResourceInfo(ctx, rp, nil, []string{"size", "mimetype", "etag"}, true)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error getting mtime for '"+n.ID+"'")
}
currentEtag, err := node.CalculateEtag(n.ID, mtime)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error calculating etag for '"+n.ID+"'")
}
expectedEtag := download.EtagFromContext(ctx)
if currentEtag != expectedEtag {
return nil, errtypes.Aborted(fmt.Sprintf("file changed from etag %s to %s", expectedEtag, currentEtag))
return nil, nil, err
}
reader, err := fs.tp.ReadBlob(n)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error download blob '"+n.ID+"'")
var reader io.ReadCloser
if openReaderFunc(ri) {
reader, err = fs.tp.ReadBlob(n)
if err != nil {
return nil, nil, errors.Wrap(err, "Decomposedfs: error download blob '"+n.ID+"'")
}
}
return reader, nil
return ri, reader, nil
}

// GetLock returns an existing lock on the given reference
Expand Down
20 changes: 12 additions & 8 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ import (
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"

"github.com/cs3org/reva/v2/internal/grpc/services/storageprovider"
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
Expand All @@ -48,11 +54,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/grants"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

var tracer trace.Tracer
Expand Down Expand Up @@ -809,17 +810,20 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi
}
}

if _, ok := mdKeysMap[ChecksumsKey]; ok {
sublog.Debug().Msg("checksums should be requested in the fieldmask, please use the trace id to update your code")
fieldMaskKeysMap["checksum"] = struct{}{}
}
// checksums
// FIXME move to fieldmask
if _, ok := mdKeysMap[ChecksumsKey]; (nodeType == provider.ResourceType_RESOURCE_TYPE_FILE) && (returnAllMetadata || ok) {
if _, ok := fieldMaskKeysMap[ChecksumsKey]; (nodeType == provider.ResourceType_RESOURCE_TYPE_FILE) && (returnAllMetadata || ok) {
// TODO which checksum was requested? sha1 adler32 or md5? for now hardcode sha1?
// TODO make ResourceInfo carry multiple checksums
n.readChecksumIntoResourceChecksum(ctx, storageprovider.XSSHA1, ri)
n.readChecksumIntoOpaque(ctx, storageprovider.XSMD5, ri)
n.readChecksumIntoOpaque(ctx, storageprovider.XSAdler32, ri)
}
// quota
// FIXME move to fieldmask
// FIXME move to fieldmask, but requires quota to be part of the ResourceInfo
if _, ok := mdKeysMap[QuotaKey]; (nodeType == provider.ResourceType_RESOURCE_TYPE_CONTAINER) && returnAllMetadata || ok {
if n.SpaceRoot != nil && n.SpaceRoot.InternalPath() != "" {
n.SpaceRoot.readQuotaIntoOpaque(ctx, ri)
Expand Down
30 changes: 19 additions & 11 deletions pkg/storage/utils/decomposedfs/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,54 +172,62 @@ func (fs *Decomposedfs) DownloadRevisionConsistent(ctx context.Context, ref *pro

// DownloadRevision returns a reader for the specified revision
// FIXME the CS3 api should explicitly allow initiating revision and trash download, a related issue is https://github.com/cs3org/reva/issues/1813
func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Reference, revisionKey string) (io.ReadCloser, error) {
func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Reference, revisionKey string, openReaderFunc func(md *provider.ResourceInfo) bool) (*provider.ResourceInfo, io.ReadCloser, error) {
log := appctx.GetLogger(ctx)

// verify revision key format
kp := strings.SplitN(revisionKey, node.RevisionIDDelimiter, 2)
if len(kp) != 2 {
log.Error().Str("revisionKey", revisionKey).Msg("malformed revisionKey")
return nil, errtypes.NotFound(revisionKey)
return nil, nil, errtypes.NotFound(revisionKey)
}
log.Debug().Str("revisionKey", revisionKey).Msg("DownloadRevision")

spaceID := ref.ResourceId.SpaceId
// check if the node is available and has not been deleted
n, err := node.ReadNode(ctx, fs.lu, spaceID, kp[0], false, nil, false)
if err != nil {
return nil, err
return nil, nil, err
}
if !n.Exists {
err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name))
return nil, err
return nil, nil, err
}

rp, err := fs.p.AssemblePermissions(ctx, n)
switch {
case err != nil:
return nil, err
return nil, nil, err
case !rp.ListFileVersions || !rp.InitiateFileDownload: // TODO add explicit permission in the CS3 api?
f, _ := storagespace.FormatReference(ref)
if rp.Stat {
return nil, errtypes.PermissionDenied(f)
return nil, nil, errtypes.PermissionDenied(f)
}
return nil, errtypes.NotFound(f)
return nil, nil, errtypes.NotFound(f)
}

contentPath := fs.lu.InternalPath(spaceID, revisionKey)

blobid, blobsize, err := fs.lu.ReadBlobIDAndSizeAttr(ctx, contentPath, nil)
if err != nil {
return nil, errors.Wrapf(err, "Decomposedfs: could not read blob id and size for revision '%s' of node '%s'", n.ID, revisionKey)
return nil, nil, errors.Wrapf(err, "Decomposedfs: could not read blob id and size for revision '%s' of node '%s'", n.ID, revisionKey)
}

revisionNode := node.Node{SpaceID: spaceID, BlobID: blobid, Blobsize: blobsize} // blobsize is needed for the s3ng blobstore

reader, err := fs.tp.ReadBlob(&revisionNode)
ri, err := n.AsResourceInfo(ctx, rp, nil, []string{"size", "mimetype", "etag"}, true)
if err != nil {
return nil, errors.Wrapf(err, "Decomposedfs: could not download blob of revision '%s' for node '%s'", n.ID, revisionKey)
return nil, nil, err
}

var reader io.ReadCloser
if openReaderFunc(ri) {
reader, err = fs.tp.ReadBlob(&revisionNode)
if err != nil {
return nil, nil, errors.Wrapf(err, "Decomposedfs: could not download blob of revision '%s' for node '%s'", n.ID, revisionKey)
}
}
return reader, nil
return ri, reader, nil
}

// RestoreRevision restores the specified revision of the resource
Expand Down
Loading

0 comments on commit 372b288

Please sign in to comment.