diff --git a/pkg/rhttp/datatx/utils/download/download.go b/pkg/rhttp/datatx/utils/download/download.go index 05d58fbef9..78bbc71a1a 100644 --- a/pkg/rhttp/datatx/utils/download/download.go +++ b/pkg/rhttp/datatx/utils/download/download.go @@ -92,13 +92,13 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI // TODO check preconditions like If-Range, If-Match ... var md *provider.ResourceInfo - var reader io.ReadCloser + var sendContent io.ReadCloser var err error var notModified bool // do a stat to set Content-Length and etag headers - md, reader, err = fs.Download(ctx, ref, func(md *provider.ResourceInfo) bool { + md, sendContent, err = fs.Download(ctx, ref, func(md *provider.ResourceInfo) bool { // range requests always need to open the reader to check if it is seekable if r.Header.Get("Range") != "" { return true @@ -126,8 +126,8 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI handleError(w, &sublog, err, "download") return } - if reader != nil { - defer reader.Close() + if sendContent != nil { + defer sendContent.Close() } if notModified { w.Header().Set(net.HeaderETag, md.Etag) @@ -165,7 +165,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI code := http.StatusOK sendSize := int64(md.Size) var s io.Seeker - if s, ok = reader.(io.Seeker); ok { + if s, ok = sendContent.(io.Seeker); ok { // tell clients they can send range requests w.Header().Set("Accept-Ranges", "bytes") } @@ -209,7 +209,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI pr, pw := io.Pipe() mw := multipart.NewWriter(pw) w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary()) - reader = pr + sendContent = pr defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish. go func() { for _, ra := range ranges { @@ -222,7 +222,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI _ = pw.CloseWithError(err) // CloseWithError always returns nil return } - if _, err := io.CopyN(part, reader, ra.Length); err != nil { + if _, err := io.CopyN(part, sendContent, ra.Length); err != nil { _ = pw.CloseWithError(err) // CloseWithError always returns nil return } @@ -237,7 +237,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI w.Header().Set(net.HeaderContentLength, strconv.FormatInt(sendSize, 10)) } - w.Header().Set(net.HeaderContentDisposistion, net.ContentDispositionAttachment(md.Name)) + w.Header().Set(net.HeaderContentDisposistion, net.ContentDispositionAttachment(path.Base(md.Path))) w.Header().Set(net.HeaderETag, md.Etag) w.Header().Set(net.HeaderOCFileID, storagespace.FormatResourceID(md.Id)) w.Header().Set(net.HeaderOCETag, md.Etag) @@ -251,7 +251,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI if r.Method != "HEAD" { var c int64 - c, err = io.CopyN(w, reader, sendSize) + c, err = io.CopyN(w, sendContent, sendSize) if err != nil { sublog.Error().Err(err).Interface("resourceid", md.Id).Msg("error copying data to response") return diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 3822b90c05..23c7095f43 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -47,7 +47,6 @@ import ( "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics" - "github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/aspects" @@ -1054,62 +1053,6 @@ func (fs *Decomposedfs) Delete(ctx context.Context, ref *provider.Reference) (er return fs.tp.Delete(ctx, node) } -// ConsistentDownload returns the metadata for a resource and a callback to get the content stream matching the etag -func (fs *Decomposedfs) ConsistentDownload(ctx context.Context, ref *provider.Reference) (*provider.ResourceInfo, func(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error), error) { - - ctx, span := tracer.Start(ctx, "ConsistentDownload") - defer span.End() - // check if we are trying to download a revision - // TODO the CS3 api should allow initiating a revision download - if ref.ResourceId != nil && strings.Contains(ref.ResourceId.OpaqueId, node.RevisionIDDelimiter) { - return fs.DownloadRevisionConsistent(ctx, ref, ref.ResourceId.OpaqueId) - } - - n, err := fs.lu.NodeFromResource(ctx, ref) - if err != nil { - return nil, nil, errors.Wrap(err, "Decomposedfs: error resolving ref") - } - - if !n.Exists { - err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name)) - return nil, nil, err - } - - rp, err := fs.p.AssemblePermissions(ctx, n) - switch { - case err != nil: - return nil, nil, err - case !rp.InitiateFileDownload: - f, _ := storagespace.FormatReference(ref) - if rp.Stat { - return nil, nil, errtypes.PermissionDenied(f) - } - return nil, nil, errtypes.NotFound(f) - } - - mtime, err := n.GetMTime(ctx) - if err != nil { - return nil, nil, errors.Wrap(err, "Decomposedfs: error getting mtime for '"+n.ID+"'") - } - currentEtag, err := node.CalculateEtag(n.ID, mtime) - if err != nil { - return nil, nil, errors.Wrap(err, "Decomposedfs: error calculating etag for '"+n.ID+"'") - } - expectedEtag := download.EtagFromContext(ctx) - if currentEtag != expectedEtag { - return nil, nil, errtypes.Aborted(fmt.Sprintf("file changed from etag %s to %s", expectedEtag, currentEtag)) - } - md, err := n.AsResourceInfo(ctx, rp, nil, []string{"id", "name", "size", "mtime", "mimetype", "etag", "checksum"}, true) - - return md, func(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) { - reader, err := fs.tp.ReadBlob(n) - if err != nil { - return nil, errors.Wrap(err, "Decomposedfs: error download blob '"+n.ID+"'") - } - return reader, nil - }, err -} - // Download returns a reader to the specified resource func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference, openReaderFunc func(md *provider.ResourceInfo) bool) (*provider.ResourceInfo, io.ReadCloser, error) { ctx, span := tracer.Start(ctx, "Download") diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index eb68089bdd..795ce65c0e 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -37,12 +37,6 @@ import ( userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" - "github.com/google/uuid" - "github.com/pkg/errors" - "github.com/rogpeppe/go-internal/lockedfile" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/trace" - "github.com/cs3org/reva/v2/internal/grpc/services/storageprovider" "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" @@ -54,6 +48,11 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/grants" "github.com/cs3org/reva/v2/pkg/utils" + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/rogpeppe/go-internal/lockedfile" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" ) var tracer trace.Tracer @@ -810,12 +809,9 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi } } - if _, ok := mdKeysMap[ChecksumsKey]; ok { - sublog.Debug().Msg("checksums should be requested in the fieldmask, please use the trace id to update your code") - fieldMaskKeysMap["checksum"] = struct{}{} - } // checksums - if _, ok := fieldMaskKeysMap[ChecksumsKey]; (nodeType == provider.ResourceType_RESOURCE_TYPE_FILE) && (returnAllMetadata || ok) { + // FIXME move to fieldmask + if _, ok := mdKeysMap[ChecksumsKey]; (nodeType == provider.ResourceType_RESOURCE_TYPE_FILE) && (returnAllMetadata || ok) { // TODO which checksum was requested? sha1 adler32 or md5? for now hardcode sha1? // TODO make ResourceInfo carry multiple checksums n.readChecksumIntoResourceChecksum(ctx, storageprovider.XSSHA1, ri) @@ -823,7 +819,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi n.readChecksumIntoOpaque(ctx, storageprovider.XSAdler32, ri) } // quota - // FIXME move to fieldmask, but requires quota to be part of the ResourceInfo + // FIXME move to fieldmask if _, ok := mdKeysMap[QuotaKey]; (nodeType == provider.ResourceType_RESOURCE_TYPE_CONTAINER) && returnAllMetadata || ok { if n.SpaceRoot != nil && n.SpaceRoot.InternalPath() != "" { n.SpaceRoot.readQuotaIntoOpaque(ctx, ri)