Skip to content

Commit

Permalink
reduce changes
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Nov 11, 2024
1 parent 11b49f1 commit 3b4e40a
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 78 deletions.
18 changes: 9 additions & 9 deletions pkg/rhttp/datatx/utils/download/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
// TODO check preconditions like If-Range, If-Match ...

var md *provider.ResourceInfo
var reader io.ReadCloser
var sendContent io.ReadCloser
var err error
var notModified bool

// do a stat to set Content-Length and etag headers

md, reader, err = fs.Download(ctx, ref, func(md *provider.ResourceInfo) bool {
md, sendContent, err = fs.Download(ctx, ref, func(md *provider.ResourceInfo) bool {
// range requests always need to open the reader to check if it is seekable
if r.Header.Get("Range") != "" {
return true
Expand Down Expand Up @@ -126,8 +126,8 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
handleError(w, &sublog, err, "download")
return
}
if reader != nil {
defer reader.Close()
if sendContent != nil {
defer sendContent.Close()
}
if notModified {
w.Header().Set(net.HeaderETag, md.Etag)
Expand Down Expand Up @@ -165,7 +165,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
code := http.StatusOK
sendSize := int64(md.Size)
var s io.Seeker
if s, ok = reader.(io.Seeker); ok {
if s, ok = sendContent.(io.Seeker); ok {
// tell clients they can send range requests
w.Header().Set("Accept-Ranges", "bytes")
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
pr, pw := io.Pipe()
mw := multipart.NewWriter(pw)
w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
reader = pr
sendContent = pr
defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
go func() {
for _, ra := range ranges {
Expand All @@ -222,7 +222,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
_ = pw.CloseWithError(err) // CloseWithError always returns nil
return
}
if _, err := io.CopyN(part, reader, ra.Length); err != nil {
if _, err := io.CopyN(part, sendContent, ra.Length); err != nil {
_ = pw.CloseWithError(err) // CloseWithError always returns nil
return
}
Expand All @@ -237,7 +237,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
w.Header().Set(net.HeaderContentLength, strconv.FormatInt(sendSize, 10))
}

w.Header().Set(net.HeaderContentDisposistion, net.ContentDispositionAttachment(md.Name))
w.Header().Set(net.HeaderContentDisposistion, net.ContentDispositionAttachment(path.Base(md.Path)))
w.Header().Set(net.HeaderETag, md.Etag)
w.Header().Set(net.HeaderOCFileID, storagespace.FormatResourceID(md.Id))
w.Header().Set(net.HeaderOCETag, md.Etag)
Expand All @@ -251,7 +251,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI

if r.Method != "HEAD" {
var c int64
c, err = io.CopyN(w, reader, sendSize)
c, err = io.CopyN(w, sendContent, sendSize)
if err != nil {
sublog.Error().Err(err).Interface("resourceid", md.Id).Msg("error copying data to response")
return
Expand Down
57 changes: 0 additions & 57 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/utils/chunking"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/aspects"
Expand Down Expand Up @@ -1054,62 +1053,6 @@ func (fs *Decomposedfs) Delete(ctx context.Context, ref *provider.Reference) (er
return fs.tp.Delete(ctx, node)
}

// ConsistentDownload returns the metadata for a resource and a callback to get the content stream matching the etag
func (fs *Decomposedfs) ConsistentDownload(ctx context.Context, ref *provider.Reference) (*provider.ResourceInfo, func(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error), error) {

ctx, span := tracer.Start(ctx, "ConsistentDownload")
defer span.End()
// check if we are trying to download a revision
// TODO the CS3 api should allow initiating a revision download
if ref.ResourceId != nil && strings.Contains(ref.ResourceId.OpaqueId, node.RevisionIDDelimiter) {
return fs.DownloadRevisionConsistent(ctx, ref, ref.ResourceId.OpaqueId)
}

n, err := fs.lu.NodeFromResource(ctx, ref)
if err != nil {
return nil, nil, errors.Wrap(err, "Decomposedfs: error resolving ref")
}

if !n.Exists {
err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name))
return nil, nil, err
}

rp, err := fs.p.AssemblePermissions(ctx, n)
switch {
case err != nil:
return nil, nil, err
case !rp.InitiateFileDownload:
f, _ := storagespace.FormatReference(ref)
if rp.Stat {
return nil, nil, errtypes.PermissionDenied(f)
}
return nil, nil, errtypes.NotFound(f)
}

mtime, err := n.GetMTime(ctx)
if err != nil {
return nil, nil, errors.Wrap(err, "Decomposedfs: error getting mtime for '"+n.ID+"'")
}
currentEtag, err := node.CalculateEtag(n.ID, mtime)
if err != nil {
return nil, nil, errors.Wrap(err, "Decomposedfs: error calculating etag for '"+n.ID+"'")
}
expectedEtag := download.EtagFromContext(ctx)
if currentEtag != expectedEtag {
return nil, nil, errtypes.Aborted(fmt.Sprintf("file changed from etag %s to %s", expectedEtag, currentEtag))
}
md, err := n.AsResourceInfo(ctx, rp, nil, []string{"id", "name", "size", "mtime", "mimetype", "etag", "checksum"}, true)

return md, func(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) {
reader, err := fs.tp.ReadBlob(n)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error download blob '"+n.ID+"'")
}
return reader, nil
}, err
}

// Download returns a reader to the specified resource
func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference, openReaderFunc func(md *provider.ResourceInfo) bool) (*provider.ResourceInfo, io.ReadCloser, error) {
ctx, span := tracer.Start(ctx, "Download")
Expand Down
20 changes: 8 additions & 12 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ import (
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"

"github.com/cs3org/reva/v2/internal/grpc/services/storageprovider"
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
Expand All @@ -54,6 +48,11 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/grants"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

var tracer trace.Tracer
Expand Down Expand Up @@ -810,20 +809,17 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi
}
}

if _, ok := mdKeysMap[ChecksumsKey]; ok {
sublog.Debug().Msg("checksums should be requested in the fieldmask, please use the trace id to update your code")
fieldMaskKeysMap["checksum"] = struct{}{}
}
// checksums
if _, ok := fieldMaskKeysMap[ChecksumsKey]; (nodeType == provider.ResourceType_RESOURCE_TYPE_FILE) && (returnAllMetadata || ok) {
// FIXME move to fieldmask
if _, ok := mdKeysMap[ChecksumsKey]; (nodeType == provider.ResourceType_RESOURCE_TYPE_FILE) && (returnAllMetadata || ok) {
// TODO which checksum was requested? sha1 adler32 or md5? for now hardcode sha1?
// TODO make ResourceInfo carry multiple checksums
n.readChecksumIntoResourceChecksum(ctx, storageprovider.XSSHA1, ri)
n.readChecksumIntoOpaque(ctx, storageprovider.XSMD5, ri)
n.readChecksumIntoOpaque(ctx, storageprovider.XSAdler32, ri)
}
// quota
// FIXME move to fieldmask, but requires quota to be part of the ResourceInfo
// FIXME move to fieldmask
if _, ok := mdKeysMap[QuotaKey]; (nodeType == provider.ResourceType_RESOURCE_TYPE_CONTAINER) && returnAllMetadata || ok {
if n.SpaceRoot != nil && n.SpaceRoot.InternalPath() != "" {
n.SpaceRoot.readQuotaIntoOpaque(ctx, ri)
Expand Down

0 comments on commit 3b4e40a

Please sign in to comment.