From 0f1596fa1f77c993ca2985c756347ebf16dc6838 Mon Sep 17 00:00:00 2001 From: Hugo Gonzalez Labrador Date: Thu, 11 Jun 2020 12:06:28 +0200 Subject: [PATCH 1/5] propper packags for data transfers --- .../http/services/dataprovider/_index.md | 14 +- examples/standalone/standalone.toml | 4 +- .../services/dataprovider/dataprovider.go | 133 +++-------- internal/http/services/dataprovider/put.go | 112 --------- pkg/datatx/datatx.go | 31 +++ pkg/datatx/manager/loader/loader.go | 26 +++ pkg/datatx/manager/registry/registry.go | 34 +++ pkg/datatx/manager/simple/simple.go | 64 +++++ pkg/datatx/manager/tus/tus.go | 64 +++++ .../get.go => pkg/rhttp/datatx/datatx.go | 35 +-- pkg/rhttp/datatx/manager/loader/loader.go | 26 +++ pkg/rhttp/datatx/manager/registry/registry.go | 34 +++ pkg/rhttp/datatx/manager/simple/simple.go | 116 +++++++++ pkg/rhttp/datatx/manager/tus/tus.go | 220 ++++++++++++++++++ 14 files changed, 664 insertions(+), 249 deletions(-) delete mode 100644 internal/http/services/dataprovider/put.go create mode 100644 pkg/datatx/datatx.go create mode 100644 pkg/datatx/manager/loader/loader.go create mode 100644 pkg/datatx/manager/registry/registry.go create mode 100644 pkg/datatx/manager/simple/simple.go create mode 100644 pkg/datatx/manager/tus/tus.go rename internal/http/services/dataprovider/get.go => pkg/rhttp/datatx/datatx.go (50%) create mode 100644 pkg/rhttp/datatx/manager/loader/loader.go create mode 100644 pkg/rhttp/datatx/manager/registry/registry.go create mode 100644 pkg/rhttp/datatx/manager/simple/simple.go create mode 100644 pkg/rhttp/datatx/manager/tus/tus.go diff --git a/docs/content/en/docs/config/http/services/dataprovider/_index.md b/docs/content/en/docs/config/http/services/dataprovider/_index.md index bc99c1d11b..b46692f991 100644 --- a/docs/content/en/docs/config/http/services/dataprovider/_index.md +++ b/docs/content/en/docs/config/http/services/dataprovider/_index.md @@ -32,11 +32,19 @@ The configuration for the storage driver [[Ref]](https://github.com/cs3org/reva/ {{< /highlight >}} {{% /dir %}} -{{% dir name="disable_tus" type="bool" default=false %}} -Whether to disable TUS uploads. [[Ref]](https://github.com/cs3org/reva/tree/master/internal/http/services/dataprovider/dataprovider.go#L42) +{{% dir name="datatx" type="string" default="simple" %}} +The data transfer protocol to use [[Ref]](https://github.com/cs3org/reva/tree/master/internal/http/services/dataprovider/dataprovider.go#L42) {{< highlight toml >}} [http.services.dataprovider] -disable_tus = false +datatx = "simple" +{{< /highlight >}} +{{% /dir %}} + +{{% dir name="datatxs" type="map[string]map[string]interface{}" default="docs/config/packages/rhttp/datatx" %}} +The data transfer protocol to use [[Ref]](https://github.com/cs3org/reva/tree/master/internal/http/services/dataprovider/dataprovider.go#L43) +{{< highlight toml >}} +[http.services.dataprovider.datatxs] +"[docs/config/packages/rhttp/datatx]({{< ref "docs/config/packages/rhttp/datatx" >}})" {{< /highlight >}} {{% /dir %}} diff --git a/examples/standalone/standalone.toml b/examples/standalone/standalone.toml index 1ff9abb8ad..aa627f2c8d 100644 --- a/examples/standalone/standalone.toml +++ b/examples/standalone/standalone.toml @@ -66,7 +66,9 @@ address = "0.0.0.0:19001" [http.services.dataprovider] driver = "local" -temp_folder = "/var/tmp/" +datatx = "simple" + +[http.services.datatxs.tus] [http.services.dataprovider.drivers.local] namespace = "/var/tmp/reva/" diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index 36b33f026d..4502bc7e34 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -22,13 +22,13 @@ import ( "fmt" "net/http" - "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/rhttp/datatx" + datatxregistry "github.com/cs3org/reva/pkg/rhttp/datatx/manager/registry" "github.com/cs3org/reva/pkg/rhttp/global" "github.com/cs3org/reva/pkg/storage" "github.com/cs3org/reva/pkg/storage/fs/registry" "github.com/mitchellh/mapstructure" "github.com/rs/zerolog" - tusd "github.com/tus/tusd/pkg/handler" ) func init() { @@ -36,16 +36,18 @@ func init() { } type config struct { - Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"` - Driver string `mapstructure:"driver" docs:"local;The storage driver to be used."` - Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:docs/config/packages/storage/fs;The configuration for the storage driver"` - DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."` + Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"` + Driver string `mapstructure:"driver" docs:"local;The storage driver to be used."` + Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:docs/config/packages/storage/fs;The configuration for the storage driver"` + DataTX string `mapstructure:"datatx" docs:"simple;The data transfer protocol to use"` + DataTXs map[string]map[string]interface{} `mapstructure:"datatxs" docs:"url:docs/config/packages/rhttp/datatx;The data transfer protocol to use"` } type svc struct { conf *config handler http.Handler storage storage.FS + datatx datatx.DataTX } // New returns a new datasvc @@ -64,11 +66,21 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) return nil, err } + datatx, err := getDataTX(conf) + if err != nil { + return nil, err + } + s := &svc{ storage: fs, + datatx: datatx, conf: conf, } - err = s.setHandler() + + if err := s.setHandler(); err != nil { + return nil, err + } + return s, err } @@ -81,12 +93,6 @@ func (s *svc) Unprotected() []string { return []string{} } -// Create a new DataStore instance which is responsible for -// storing the uploaded file on disk in the specified directory. -// This path _must_ exist before tusd will store uploads in it. -// If you want to save them on a different medium, for example -// a remote FTP server, you can implement your own storage backend -// by implementing the tusd.DataStore interface. func getFS(c *config) (storage.FS, error) { if f, ok := registry.NewFuncs[c.Driver]; ok { return f(c.Drivers[c.Driver]) @@ -102,96 +108,19 @@ func (s *svc) Handler() http.Handler { return s.handler } -// Composable is the interface that a struct needs to implement to be composable by this composer -type Composable interface { - UseIn(composer *tusd.StoreComposer) +func (s *svc) setHandler() error { + h, err := s.datatx.Handler(s.conf.Prefix, s.storage) + if err != nil { + return err + } + + s.handler = h + return nil } -func (s *svc) setHandler() (err error) { - composable, ok := s.storage.(Composable) - if ok && !s.conf.DisableTus { - // A storage backend for tusd may consist of multiple different parts which - // handle upload creation, locking, termination and so on. The composer is a - // place where all those separated pieces are joined together. In this example - // we only use the file store but you may plug in multiple. - composer := tusd.NewStoreComposer() - - // let the composable storage tell tus which extensions it supports - composable.UseIn(composer) - - config := tusd.Config{ - BasePath: s.conf.Prefix, - StoreComposer: composer, - //Logger: logger, // TODO use logger - } - - handler, err := tusd.NewUnroutedHandler(config) - if err != nil { - return err - } - - s.handler = handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - - log := appctx.GetLogger(r.Context()) - log.Info().Msgf("tusd routing: path=%s", r.URL.Path) - - method := r.Method - // https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override - if r.Header.Get("X-HTTP-Method-Override") != "" { - method = r.Header.Get("X-HTTP-Method-Override") - } - - switch method { - // old fashioned download. - - // GET is not part of the tus.io protocol - // currently there is no way to GET an upload that is in progress - // TODO allow range based get requests? that end before the current offset - case "GET": - s.doGet(w, r) - - // tus.io based upload - - // uploads are initiated using the CS3 APIs Initiate Download call - case "POST": - handler.PostFile(w, r) - case "HEAD": - handler.HeadFile(w, r) - case "PATCH": - handler.PatchFile(w, r) - // PUT provides a wrapper around the POST call, to save the caller from - // the trouble of configuring the tus client. - case "PUT": - s.doTusPut(w, r) - // TODO Only attach the DELETE handler if the Terminate() method is provided - case "DELETE": - handler.DelFile(w, r) - } - })) - } else { - s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - method := r.Method - // https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override - if r.Header.Get("X-HTTP-Method-Override") != "" { - method = r.Header.Get("X-HTTP-Method-Override") - } - - switch method { - case "HEAD": - w.WriteHeader(http.StatusOK) - return - case "GET": - s.doGet(w, r) - return - case "PUT": - s.doPut(w, r) - return - default: - w.WriteHeader(http.StatusNotImplemented) - return - } - }) +func getDataTX(c *config) (datatx.DataTX, error) { + if f, ok := datatxregistry.NewFuncs[c.Driver]; ok { + return f(c.DataTXs[c.DataTX]) } - - return err + return nil, fmt.Errorf("driver not found: %s", c.Driver) } diff --git a/internal/http/services/dataprovider/put.go b/internal/http/services/dataprovider/put.go deleted file mode 100644 index 6471b475d5..0000000000 --- a/internal/http/services/dataprovider/put.go +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright 2018-2020 CERN -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// In applying this license, CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -package dataprovider - -import ( - "fmt" - "net/http" - "path" - "strconv" - "strings" - - provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/cs3org/reva/pkg/appctx" - "github.com/cs3org/reva/pkg/rhttp" - "github.com/cs3org/reva/pkg/token" - "github.com/eventials/go-tus" - "github.com/eventials/go-tus/memorystore" -) - -func (s *svc) doPut(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - log := appctx.GetLogger(ctx) - fn := r.URL.Path - - fsfn := strings.TrimPrefix(fn, s.conf.Prefix) - ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fsfn}} - - err := s.storage.Upload(ctx, ref, r.Body) - if err != nil { - log.Error().Err(err).Msg("error uploading file") - w.WriteHeader(http.StatusInternalServerError) - return - } - - r.Body.Close() - w.WriteHeader(http.StatusOK) -} - -func (s *svc) doTusPut(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - log := appctx.GetLogger(ctx) - - fp := r.Header.Get("File-Path") - if fp == "" { - w.WriteHeader(http.StatusBadRequest) - return - } - - length, err := strconv.ParseInt(r.Header.Get("Content-Length"), 10, 64) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - dataServerURL := fmt.Sprintf("http://%s%s", r.Host, r.RequestURI) - - // create the tus client. - c := tus.DefaultConfig() - c.Resume = true - c.HttpClient = rhttp.GetHTTPClient(ctx) - c.Store, err = memorystore.NewMemoryStore() - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - c.Header.Set(token.TokenHeader, token.ContextMustGetToken(ctx)) - - tusc, err := tus.NewClient(dataServerURL, c) - if err != nil { - log.Error().Err(err).Msg("error starting TUS client") - w.WriteHeader(http.StatusInternalServerError) - return - } - - metadata := map[string]string{ - "filename": path.Base(fp), - "dir": path.Dir(fp), - } - - upload := tus.NewUpload(r.Body, length, metadata, "") - defer r.Body.Close() - - // create the uploader. - c.Store.Set(upload.Fingerprint, dataServerURL) - uploader := tus.NewUploader(tusc, dataServerURL, upload, 0) - - // start the uploading process. - err = uploader.Upload() - if err != nil { - log.Error().Err(err).Msg("Could not start TUS upload") - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) -} diff --git a/pkg/datatx/datatx.go b/pkg/datatx/datatx.go new file mode 100644 index 0000000000..e496da4d47 --- /dev/null +++ b/pkg/datatx/datatx.go @@ -0,0 +1,31 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +// Package datatx provides a library to abstract the complexity +// of using various data transfer protocols. +package datatx + +import ( + "context" + "io" +) + +type DataTX interface { + Upload(ctx context.Context, uri string, rc io.Reader) error + Download(ctx context.Context, uri string) (io.Reader, error) +} diff --git a/pkg/datatx/manager/loader/loader.go b/pkg/datatx/manager/loader/loader.go new file mode 100644 index 0000000000..f5f186ad77 --- /dev/null +++ b/pkg/datatx/manager/loader/loader.go @@ -0,0 +1,26 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package loader + +import ( + // Load core data transfer protocols + _ "github.com/cs3org/reva/pkg/datatx/manager/simple" + _ "github.com/cs3org/reva/pkg/datatx/manager/tus" + // Add your own here +) diff --git a/pkg/datatx/manager/registry/registry.go b/pkg/datatx/manager/registry/registry.go new file mode 100644 index 0000000000..7d3c7b65dc --- /dev/null +++ b/pkg/datatx/manager/registry/registry.go @@ -0,0 +1,34 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package registry + +import "github.com/cs3org/reva/pkg/datatx" + +// NewFunc is the function that data transfer implementations +// should register at init time. +type NewFunc func(map[string]interface{}) (datatx.DataTX, error) + +// NewFuncs is a map containing all the registered data transfers. +var NewFuncs = map[string]NewFunc{} + +// Register registers a new data transfer new function. +// Not safe for concurrent use. Safe for use from package init. +func Register(name string, f NewFunc) { + NewFuncs[name] = f +} diff --git a/pkg/datatx/manager/simple/simple.go b/pkg/datatx/manager/simple/simple.go new file mode 100644 index 0000000000..6fc28921ab --- /dev/null +++ b/pkg/datatx/manager/simple/simple.go @@ -0,0 +1,64 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package simple + +import ( + "context" + "io" + + "github.com/cs3org/reva/pkg/datatx" + "github.com/cs3org/reva/pkg/datatx/manager/registry" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" +) + +func init() { + registry.Register("simple", New) +} + +type manager struct{} + +type config struct{} + +func parseConfig(m map[string]interface{}) (*config, error) { + c := &config{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "error decoding conf") + return nil, err + } + return c, nil +} + +// New returns a datatx manager implementation that relies on HTTP PUT/GET. +func New(m map[string]interface{}) (datatx.DataTX, error) { + _, err := parseConfig(m) + if err != nil { + return nil, err + } + + return &manager{}, nil +} + +func (m *manager) Upload(ctx context.Context, uri string, rc io.Reader) error { + return nil +} + +func (m *manager) Download(ctx context.Context, uri string) (io.Reader, error) { + return nil, nil +} diff --git a/pkg/datatx/manager/tus/tus.go b/pkg/datatx/manager/tus/tus.go new file mode 100644 index 0000000000..23a75a9428 --- /dev/null +++ b/pkg/datatx/manager/tus/tus.go @@ -0,0 +1,64 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package tus + +import ( + "context" + "io" + + "github.com/cs3org/reva/pkg/datatx" + "github.com/cs3org/reva/pkg/datatx/manager/registry" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" +) + +func init() { + registry.Register("tus", New) +} + +type manager struct{} + +type config struct{} + +func parseConfig(m map[string]interface{}) (*config, error) { + c := &config{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "error decoding conf") + return nil, err + } + return c, nil +} + +// New returns a datatx manager implementation that relies on HTTP PUT/GET. +func New(m map[string]interface{}) (datatx.DataTX, error) { + _, err := parseConfig(m) + if err != nil { + return nil, err + } + + return &manager{}, nil +} + +func (m *manager) Upload(ctx context.Context, uri string, rc io.Reader) error { + return nil +} + +func (m *manager) Download(ctx context.Context, uri string) (io.Reader, error) { + return nil, nil +} diff --git a/internal/http/services/dataprovider/get.go b/pkg/rhttp/datatx/datatx.go similarity index 50% rename from internal/http/services/dataprovider/get.go rename to pkg/rhttp/datatx/datatx.go index e0ef9885a2..54382e71d7 100644 --- a/internal/http/services/dataprovider/get.go +++ b/pkg/rhttp/datatx/datatx.go @@ -16,41 +16,14 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -package dataprovider +package datatx import ( - "io" "net/http" - "strings" - provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/storage" ) -func (s *svc) doGet(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - log := appctx.GetLogger(ctx) - var fn string - files, ok := r.URL.Query()["filename"] - if !ok || len(files[0]) < 1 { - fn = r.URL.Path - } else { - fn = files[0] - } - - fsfn := strings.TrimPrefix(fn, s.conf.Prefix) - ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fsfn}} - - rc, err := s.storage.Download(ctx, ref) - if err != nil { - log.Err(err).Msg("datasvc: error downloading file") - w.WriteHeader(http.StatusInternalServerError) - return - } - - _, err = io.Copy(w, rc) - if err != nil { - log.Error().Err(err).Msg("error copying data to response") - return - } +type DataTX interface { + Handler(prefix string, fs storage.FS) (http.Handler, error) } diff --git a/pkg/rhttp/datatx/manager/loader/loader.go b/pkg/rhttp/datatx/manager/loader/loader.go new file mode 100644 index 0000000000..be6eba0cc2 --- /dev/null +++ b/pkg/rhttp/datatx/manager/loader/loader.go @@ -0,0 +1,26 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package loader + +import ( + // Load core data transfer protocols + _ "github.com/cs3org/reva/pkg/rhttp/datatx/manager/simple" + _ "github.com/cs3org/reva/pkg/rhttp/datatx/manager/tus" + // Add your own here +) diff --git a/pkg/rhttp/datatx/manager/registry/registry.go b/pkg/rhttp/datatx/manager/registry/registry.go new file mode 100644 index 0000000000..a65c908b98 --- /dev/null +++ b/pkg/rhttp/datatx/manager/registry/registry.go @@ -0,0 +1,34 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package registry + +import "github.com/cs3org/reva/pkg/rhttp/datatx" + +// NewFunc is the function that data transfer http implementations +// should register at init time. +type NewFunc func(c map[string]interface{}) (datatx.DataTX, error) + +// NewFuncs is a map containing all the registered data transfers http handlers. +var NewFuncs = map[string]NewFunc{} + +// Register registers a new data http transfer new function. +// Not safe for concurrent use. Safe for use from package init. +func Register(name string, f NewFunc) { + NewFuncs[name] = f +} diff --git a/pkg/rhttp/datatx/manager/simple/simple.go b/pkg/rhttp/datatx/manager/simple/simple.go new file mode 100644 index 0000000000..23c75fa0e6 --- /dev/null +++ b/pkg/rhttp/datatx/manager/simple/simple.go @@ -0,0 +1,116 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package simple + +import ( + "io" + "net/http" + "strings" + + "github.com/pkg/errors" + + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/rhttp/datatx" + "github.com/cs3org/reva/pkg/rhttp/datatx/manager/registry" + "github.com/cs3org/reva/pkg/storage" + "github.com/mitchellh/mapstructure" +) + +func init() { + registry.Register("simple", New) +} + +type manager struct{} +type config struct{} + +func parseConfig(m map[string]interface{}) (*config, error) { + c := &config{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "error decoding conf") + return nil, err + } + return c, nil +} + +// New returns a datatx manager implementation that relies on HTTP PUT/GET. +func New(m map[string]interface{}) (datatx.DataTX, error) { + _, err := parseConfig(m) + if err != nil { + return nil, err + } + + return &manager{}, nil +} + +func (m *manager) Handler(prefix string, fs storage.FS) (http.Handler, error) { + h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + ctx := r.Context() + log := appctx.GetLogger(ctx) + var fn string + files, ok := r.URL.Query()["filename"] + if !ok || len(files[0]) < 1 { + fn = r.URL.Path + } else { + fn = files[0] + } + + fsfn := strings.TrimPrefix(fn, prefix) + ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fsfn}} + + rc, err := fs.Download(ctx, ref) + if err != nil { + log.Err(err).Msg("datasvc: error downloading file") + w.WriteHeader(http.StatusInternalServerError) + return + } + + _, err = io.Copy(w, rc) + if err != nil { + log.Error().Err(err).Msg("error copying data to response") + return + } + return + case "PUT": + ctx := r.Context() + log := appctx.GetLogger(ctx) + fn := r.URL.Path + + fsfn := strings.TrimPrefix(fn, prefix) + ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fsfn}} + + err := fs.Upload(ctx, ref, r.Body) + if err != nil { + log.Error().Err(err).Msg("error uploading file") + w.WriteHeader(http.StatusInternalServerError) + return + } + + r.Body.Close() + w.WriteHeader(http.StatusOK) + return + default: + w.WriteHeader(http.StatusNotImplemented) + return + } + }) + return h, nil +} diff --git a/pkg/rhttp/datatx/manager/tus/tus.go b/pkg/rhttp/datatx/manager/tus/tus.go new file mode 100644 index 0000000000..36f1806cee --- /dev/null +++ b/pkg/rhttp/datatx/manager/tus/tus.go @@ -0,0 +1,220 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package tus + +import ( + "fmt" + "io" + "net/http" + "path" + "strconv" + "strings" + + "github.com/pkg/errors" + + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/pkg/appctx" + "github.com/cs3org/reva/pkg/rhttp" + "github.com/cs3org/reva/pkg/rhttp/datatx" + "github.com/cs3org/reva/pkg/rhttp/datatx/manager/registry" + "github.com/cs3org/reva/pkg/storage" + "github.com/cs3org/reva/pkg/token" + "github.com/eventials/go-tus" + "github.com/eventials/go-tus/memorystore" + "github.com/mitchellh/mapstructure" + tusd "github.com/tus/tusd/pkg/handler" +) + +func init() { + registry.Register("tus", New) +} + +type manager struct{} +type config struct{} + +func parseConfig(m map[string]interface{}) (*config, error) { + c := &config{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "error decoding conf") + return nil, err + } + return c, nil +} + +// New returns a datatx manager implementation that relies on HTTP PUT/GET. +func New(m map[string]interface{}) (datatx.DataTX, error) { + _, err := parseConfig(m) + if err != nil { + return nil, err + } + + return &manager{}, nil +} + +func (m *manager) Handler(prefix string, fs storage.FS) (http.Handler, error) { + composable, ok := fs.(Composable) + if !ok { + // TODO(labkode): break dependency on Go interface and bring TUS logic to CS3APIS for + // resumable uploads/chunking uploads. + return nil, fmt.Errorf("dataprovider: configured storage is not tus-compatible") + } + + // A storage backend for tusd may consist of multiple different parts which + // handle upload creation, locking, termination and so on. The composer is a + // place where all those separated pieces are joined together. In this example + // we only use the file store but you may plug in multiple. + composer := tusd.NewStoreComposer() + + // let the composable storage tell tus which extensions it supports + composable.UseIn(composer) + + config := tusd.Config{ + BasePath: prefix, + StoreComposer: composer, + //Logger: logger, // TODO use logger + } + + handler, err := tusd.NewUnroutedHandler(config) + if err != nil { + return nil, err + } + + h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + log := appctx.GetLogger(r.Context()) + log.Info().Msgf("tusd routing: path=%s", r.URL.Path) + + method := r.Method + // https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override + if r.Header.Get("X-HTTP-Method-Override") != "" { + method = r.Header.Get("X-HTTP-Method-Override") + } + + switch method { + // TODO(pvince81): allow for range-based requests? + case "GET": + ctx := r.Context() + log := appctx.GetLogger(ctx) + var fn string + files, ok := r.URL.Query()["filename"] + if !ok || len(files[0]) < 1 { + fn = r.URL.Path + } else { + fn = files[0] + } + + fsfn := strings.TrimPrefix(fn, prefix) + ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fsfn}} + + rc, err := fs.Download(ctx, ref) + if err != nil { + log.Err(err).Msg("datasvc: error downloading file") + w.WriteHeader(http.StatusInternalServerError) + return + } + + _, err = io.Copy(w, rc) + if err != nil { + log.Error().Err(err).Msg("error copying data to response") + return + } + return + + // tus.io based upload + + // uploads are initiated using the CS3 APIs Initiate Download call + case "POST": + handler.PostFile(w, r) + case "HEAD": + handler.HeadFile(w, r) + case "PATCH": + handler.PatchFile(w, r) + // PUT provides a wrapper around the POST call, to save the caller from + // the trouble of configuring the tus client. + case "PUT": + ctx := r.Context() + log := appctx.GetLogger(ctx) + + fp := r.Header.Get("File-Path") + if fp == "" { + w.WriteHeader(http.StatusBadRequest) + return + } + + length, err := strconv.ParseInt(r.Header.Get("Content-Length"), 10, 64) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + dataServerURL := fmt.Sprintf("http://%s%s", r.Host, r.RequestURI) + + // create the tus client. + c := tus.DefaultConfig() + c.Resume = true + c.HttpClient = rhttp.GetHTTPClient(ctx) + c.Store, err = memorystore.NewMemoryStore() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + c.Header.Set(token.TokenHeader, token.ContextMustGetToken(ctx)) + + tusc, err := tus.NewClient(dataServerURL, c) + if err != nil { + log.Error().Err(err).Msg("error starting TUS client") + w.WriteHeader(http.StatusInternalServerError) + return + } + + metadata := map[string]string{ + "filename": path.Base(fp), + "dir": path.Dir(fp), + } + + upload := tus.NewUpload(r.Body, length, metadata, "") + defer r.Body.Close() + + // create the uploader. + c.Store.Set(upload.Fingerprint, dataServerURL) + uploader := tus.NewUploader(tusc, dataServerURL, upload, 0) + + // start the uploading process. + err = uploader.Upload() + if err != nil { + log.Error().Err(err).Msg("Could not start TUS upload") + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + + // TODO Only attach the DELETE handler if the Terminate() method is provided + case "DELETE": + handler.DelFile(w, r) + } + })) + + return h, nil +} + +// Composable is the interface that a struct needs to implement to be composable by this composer +type Composable interface { + UseIn(composer *tusd.StoreComposer) +} From 603bbcc326190494449b8a21410e68cec169247e Mon Sep 17 00:00:00 2001 From: Hugo Gonzalez Labrador Date: Thu, 11 Jun 2020 12:14:35 +0200 Subject: [PATCH 2/5] fix --- internal/http/services/dataprovider/dataprovider.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index adb3f4f404..76fdd2b8bf 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -52,6 +52,10 @@ func (c *config) init() { c.Driver = "local" } + if c.DataTX == "" { + c.DataTX = "simple" + } + } type svc struct { @@ -128,7 +132,7 @@ func (s *svc) setHandler() error { } func getDataTX(c *config) (datatx.DataTX, error) { - if f, ok := datatxregistry.NewFuncs[c.Driver]; ok { + if f, ok := datatxregistry.NewFuncs[c.DataTX]; ok { return f(c.DataTXs[c.DataTX]) } return nil, fmt.Errorf("driver not found: %s", c.Driver) From fd4fcdd311b68300182710c5c8359bea73a5061d Mon Sep 17 00:00:00 2001 From: Hugo Gonzalez Labrador Date: Thu, 11 Jun 2020 12:21:47 +0200 Subject: [PATCH 3/5] fix server --- cmd/revad/runtime/loader.go | 1 + internal/http/services/dataprovider/dataprovider.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/revad/runtime/loader.go b/cmd/revad/runtime/loader.go index 2298fe219b..53956136b3 100644 --- a/cmd/revad/runtime/loader.go +++ b/cmd/revad/runtime/loader.go @@ -35,6 +35,7 @@ import ( _ "github.com/cs3org/reva/pkg/ocm/provider/authorizer/loader" _ "github.com/cs3org/reva/pkg/ocm/share/manager/loader" _ "github.com/cs3org/reva/pkg/publicshare/manager/loader" + _ "github.com/cs3org/reva/pkg/rhttp/datatx/manager/loader" _ "github.com/cs3org/reva/pkg/share/manager/loader" _ "github.com/cs3org/reva/pkg/storage/fs/loader" _ "github.com/cs3org/reva/pkg/storage/registry/loader" diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index 76fdd2b8bf..49b92a0a0b 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -135,5 +135,5 @@ func getDataTX(c *config) (datatx.DataTX, error) { if f, ok := datatxregistry.NewFuncs[c.DataTX]; ok { return f(c.DataTXs[c.DataTX]) } - return nil, fmt.Errorf("driver not found: %s", c.Driver) + return nil, fmt.Errorf("driver not found: %s", c.DataTX) } From 4e5c0da8e0cf6ef535c1053c473b23a47348e074 Mon Sep 17 00:00:00 2001 From: Hugo Gonzalez Labrador Date: Fri, 12 Jun 2020 14:03:41 +0200 Subject: [PATCH 4/5] wip --- .../storageprovider/.storageprovider.go.swp | Bin 0 -> 40960 bytes .../storageprovider/storageprovider.go | 9 +- pkg/rhttp/datatx/manager/tus/tus.go | 135 ++----- pkg/storage/fs/local/local.go | 31 ++ pkg/storage/fs/local/upload.go | 334 ------------------ pkg/storage/storage.go | 25 +- 6 files changed, 95 insertions(+), 439 deletions(-) create mode 100644 internal/grpc/services/storageprovider/.storageprovider.go.swp delete mode 100644 pkg/storage/fs/local/upload.go diff --git a/internal/grpc/services/storageprovider/.storageprovider.go.swp b/internal/grpc/services/storageprovider/.storageprovider.go.swp new file mode 100644 index 0000000000000000000000000000000000000000..9b98a294f58c607eacaf8a881cba127928617ac7 GIT binary patch literal 40960 zcmeI537lkAdFTsdtGIv~V-kJIO)1#T^i($k2=q9?rn_gRVK%0_S%TQsRNbz=nX0-) z-Kt&~2asPtc|K#*7!*`|iAzk}5@Xc7M1DTksClv|4~Y6Spzr^krS7e| zx2mfb)R$ki{^{wu_nzfD=X~dU-}&yTLnD_=?RC!0ZcFiVN-Fh&9p8NM1$QkTHpm%9CUem$4XyR}L-ztHWzYixqU1$S|{<}T)jy-Jffw_5WTy@Fe7|2tdsyXDir)|WtE0&Nm#)aSRJ`{dM_+s+t~E6>UNmh<$d z&9>dt|FbWFz6AOb=u4n4fxZO#66i~yFM+-U{xeIUUOg#wEfqZ0RMIuSk7@gToca4Y z^LxJSdfNQ$ncx4`cKvws_e;(1{U!sO&k5#pDU1AkrtSKP`r}mD{Jzi>*gk)f`TTnG z`$KKlpI|=En&0P{5A5^iaI zS4gFvNzLvQLzw-4YyW@i{ zJ}H%Y0R9C&51)pQ!iV8K@J_f5{t#}0SHKKB1@3)fD)k9?3k0wio&`VnrBv!Ga67yU z-U;u3H$W9CP=?Fkb2ubs;LA7$3UD6|gv;R%a5Ov(UV}s8X>bah4ByAW@pbqNd6!;WvBkglX7iYzMqmp+90_SuSLow*gRtTS{yZO3Xn5-14KA{%Dz@T%* zl|jAcRf^6Lr{>lhwMz4oBS)R1snpR_+I${lO&CUYdZAG-_)C?6yidHway@%NE`PXK z^Ba}Iz~Epip7(gk4_q>HYc;>-98sy}7IR)Hcc|oEHtm?7E3Q1NAzvk6fQt3jjo|`5z#I4O9+@qg7y-_P=`74#Km#aJd5-}n+O~=voOQp+{ zuYMp`Ul?-KKc^;|mrTXNXXGgra+2LSok~;9`RFzNeY$++6;t(6HY*_l!2jFFKcC)20|Ze9O1!sFh&Q*seqF`FOg3ZU0l9M)+ z1czJh)}OZBKT-4OCvtPi&*w_^0sFDJTX)#JZPY=>t9W&~tL~_i73rn3)smkp%q-LP zhkU=()*agZdX$CK^#$6e?mOllXJ&TVNFq?VPcB!{9bqS3hxAnnbJFMM zWTd(^Tl-icTy?7>x2i$y)+mF+NMk56wq1wPI{A|4R_e}@S1LJ&Tt{Rh70}H$EP7FT z3KgoMY)dP0&8s8EmNAJawIYZ$UgU$ec8t_u4uxlH5qRZl$z^n_F23Lgbv3@X&rnv; zkn^ZqDR*nFQt5Eenk?5`AT+?-ywbL|ELKBoSEG@d*WGdzMOs^-7^0SSk5Z%3b|-R= zbm_=HRWRUuF7GlEwTnWhV6#OUqcWt1wN@UkG9M8y*bE#To>xQ34Es>RYp!uYu$lB_vnnlN`TnI zoK;(_w~ckqRaCCcdsyWypEyelUOgxx89PgUqf`(9m{Lhe)I6zMr*)dFNu(#UeGx6> z)&??0l6D=C8FKcyOJ+?Gs^!5{I#cxhV#y`;vy`h8vwp3pf<{sCT*>-Xw~}`&K_kd| z{xCW<@2V?aeW7tEoA=AZ)x*VMwH^uDE|k4G3b0gK7>;W3wwv<7nUbRlT9uScvFT`h zCUvQ;^!CImcPa6ZdQNnwR4uGPdlA&FtK36{r6pENDp#%MnWj={+YC-Z#zlWCMaWe> zhFx%GQv&vp;l(o!x%J!`q(Te#kbp$|HlLxYdXT81@=yBCqEt@xM1K3~LdC5Qiz?)U z`Ep%;ai1(nGbyy9U#mL<*aBJ`*d?7UAV$ovgKi8OhiGzibl{d4O}>+>tT?~hsChxb zQ`2x(ZY(k_vvk(l+&Q%3EPAy%GXevUCrMjc$jR~liZdlq%tfD>rc!3A$d#P^wPLQ~ zU7eFzT~eE>ILNrPA`9>Of)_X?;+$Fi4LReJ2luf&a092}Gs)ywzo-#jMe{JYQEKE+ zaJ+K4!6L9dlk$O2gO!XBNhs=>6YQ!<}5=Y^o^=Dm3@@5nf6pf(*{ zbSYcat(CnXkhO?pD;ZrnHM4vF?2L2i*ujHi`(~ylr=9%=o$>wqCZ=Yl_V44bozB?4 z%bkm+_Du{qF8!6VFtOI$AP{lOY~`UCB`qd>jE_SN5p1l`4q}Pk;3Kn&lNp@6>F|(y^6DhAwlhioWP&2FXd`3 z$w*4_V^+#bOH0`t9cRH+n#D9vFuZ4Kd~)CP@_v=S zv|hHd&ebLT6?3b!ER%z7d8Iks<-A*^4_0_u@N<(uxSxXW71P+uD)-C|M-1ESg>1_V1X<-ck$RAEq*^SYOPnyTHM+pq-x4znJx1bOt7%BY#pgx2a|Is5Ip_d zsJ5ws>`cvLGsvCymIq?b4>?Mxcx&z5yK_N@oAK_)H>${^yjSfL*&IwtgKVa$0vKu7 z&4EZ+54!U@e_e~CLu>%0r4dJ+`GBJG=QO7FsB@Z?2X3!<9B9~jb8;V>I#OpobVhe1 z-DS6BOs3RTl}2sk&_>R0?U4G*=8(!z+D%dlvhivm>iLRXq@LMbuG$wH7&O~HZRz{9 zK(&2_d&MaCDvP<2R~U6N&KC9jw33W${iS7FDt8FS3!?BHb)VxE_;*b;yDDl+ma?A} z*>N&idj+ImrKvhaTFcY^O7E}cu4=ebl)_eFip|HY{a6Vff!onSVd+B@4!gM8sL7tf zkh6sM9{**dB7>!>JOYROdfhMM3R%I3=4Sl&6>%KcbhFZ^X~{O)ih8SZs(|FXHL|`z z)vaR-%dVdhkW?sa(|Bjq{MXWAKWfCND0_Tr=rg3 zviG57*Hq$4VVjvVn=Cjjkczvcl%!NPCfkeLm4NJI258O-c3#qxWVF$w@zyW#%-}j3 zT}r3LB~#3$vSQ&QTG>?J&Xd6+vnE%VL_%Gy<`WS;l5eSI+6u-bA7n#C$?8XUjie}* z&caSuM7dW*^lS2y2k}U|LA^!icDhrKgoq-J1m55|Hv>&ZNjTJOtL?u?I)pR=ceYL} z(_M#M=MItBpxqN&>ll7KM=%p;`vC5o-QzfXlO)!|;N8rD+pXT~YGpI3)NuR-7_^;4 zyxNdWdM;_eCg0a>xSD7%yLy!|+YGI_Y?}3|vmDIGCaj2G=D}t#+o;o!&y@8yH6eG( zkNLBCMVl1OEAppYBN#(RzUk-k*=Br)Ts0cira7-N$26R`pO9Xj8*L~XYWXzt?4E$p&zkm^-1AvwoMP~Y(TtJLr7NtG8HxBocJE=Llg3J{tRlU{ z89C?dv*W5KpVeU(t0df9Wu)`XN+@aIccA45%2Y*}FycihMDZNn!_pxd!%Eqdhd>5po0ZEaqT1Bz zt!+7?bR9vKdA}fQP8l|lg$E*y28v`Sl~{ADEn*+IPv=9)hBzD0M0Ifs1x++ja23_+``R8J)K9Jx z(?Pg{Q7Q76tjm?KkX<=<*d6F4BzedocsHuk%;HittW`}mlSq7<4gscVd&@-iLhE;m z$VW#`9WVkLJ20h(&Oks#sdU_L&g(6AAvW)8{rjjBisODvpI{}_A_ z=HU>W4^Nz)8KgcTkQ1@!297ED8nB33HJH-;b9P4 z{=M)j*bh5l94>$i91q9BL)h;R!uQ|?_#Ke<4R*mcNWo(?+N1CV_$2%Zg$H!)_flx%lCb zY8Epxvf8~j6Jb{LDp|>FcxY)sZKU84QOhb)HAs8Gr_~EhN!{Bv*=pTei!a{2qhj6k z?mgj_+mi>eQXh`+TG5ww^ad zMpZ+osyew6buX+Cc1?hFpw>TmamJvUly#FG(>F}2>q*(zEekU-%f58GM>SrZgp9(? z56LAUq&dGJ#mFj~RIHiaH$!woOn9~Epb9syAE%j3s@`ET?%XprZJf1bAvIrZz_mS+ zh(p;ck(&tTUx2XRxKWVe}zA5FG?DUH^(ueo> z#Uk%N$-N2d-`3Q6>j7G1akU$!WmRHi$sqgxzk%WReC$B6|LysEk74&e3ZI9+gc^7- z4?AHHo&~=OC&KY?9IRmXi*H~jOu}!#li@4a|F^=;P=!*JQq%YW8oP1625>} z!^_|(Tn)0XFTR4q@V~MD#ZT})xB5TC#gvGe~O z9)UaIPvMW?b#Ogg2iL*_wDsS^U%?F^`~RbG4oE*Z8&>(*oMm_yRZCaJwd?N0Jz!pM zv7MJ0WYe^5JeOV+Zv3evott!5lG;tyx~9^ESn$fB+Po2yRzfc1MPjS7y6gnYTiy}r z1)=yhVndZk)vnDQ-gi@PiglPRJC`8dfU1&qFC;W=C5eOXqNlQvrolO-q96u;22Pf> zlwX&sixDf9c!Hj9&!d;_*ZxdW=ZoLdd)JfDQ0Zr${FW86?Rh=-yM#5kQWz@5aF;@7 z;FJq89jhIR(6CR)w6oJJ!8Ed?DvU`Ikt7w&c1AUuOG>(*QK~lcP3HjHaDnpx$;c4sFt&8b!uiIG?Qok= zYN;x22dSy~CRRNCe9}nPy#!G)s0t(W1rH zB+ZygSdT9I|6h_3=X`Wsy)jYtrd?Cr&0Ep2=tG}O`=i++Q5CIPFW#-YEWhREhzgXk87^+(DXmG} zn6ifIw=J_?wTtKwvHvj%Q=gZ!<*@(D#_;7-r zCVU;f27dv62(N+bK;JlLRzh`JVtLeM*OkxbcFEx(lasN)9m2X) zppZFmB`lLF?}k>$9WdDC z&`VK7XX?9?3KPBBtvIc^G~Ok97|SLr|)n`o22R- z@+uv=dvfOLO*dZ-w^{PoS>e1VrB`y_ci8T2 zV!^Ym-pk{+eT9zKaoR0ut3KMEvLo)a)3kPJyDW)^iOS9DMM8%I z`6?tcUD)U7wYug{fP@qj|+_aWaCr`3chtkJ7*&n8y zy^Q^qYQ=ZmS}u2M@%8(HOkp?66VztCckEl~w8>ahjqa5lokA@|3_G56rAljs&Q#TQ z;!fR^uAn>j#_al*;&0s7)+XVgxsTI%-O@J9tmaLy>RZYh5w)4f!-3ZG5ZOGlzNKsl zMt#e5qei=%^es>HY|yW15o%O-WD_lv>P9qYtvRon4K&-RZH7s={H^*_n-+FaX&$f} zW_m8Q9voSvCCsT}|34MGQSJF+|6gzH{~u!K|2sSaABXqDD`78)Phbq>T)=0;PqFtO zh5JGL0^flbgPac_X8{htaqtNC|EJ+Ca4|&B{CggM9}f>>^UIlk_ko-da0h%4Zh~F# z+prx*;k(%X5(gl@f!%OEya42V|5M-*d;s^uy&&%n{4QJwDYysQ{}y-+{64%2=3y4L z!u|LH{uRCkUxk~&hxYda9>o_Reu6K;Q*a>+!N1}g_#1dB1W*MZcEdSvGTeoa z;FIui_!z9f#US5LkcJaL-VOLFK7x@ zAv_08gyY~5d!ATPI2UXjfSd~u|G}LVD%K$<`qqy*5xE(RKkBHreRz9L z9i?m{c+J!E@>ZQb+W89muPb$c4X1(Vm+S5Ey7IpIPQK+J2=6qn*DG_ks9&H9&vYNK z!(!)jB;FvvRkEUHX?N_?B-tJJS8D1RsCBWX`% z-02o6PA>*UJ{2QaA*0S#Jt_#}yEbaSny|sNOCBRWBtIUn@{NhuK zEV!btgW6}J16jYws~3>Xk)Te-5c_`-1JB0)d&b!N9CrO*z?~sM*kAr4X*~-!`}-h z!B4Qke+-|7JK+v^11!ThWFZA##s>cY{65UVi{K)dg57X8Hu%fnLO2he0sn-pEwT9W zj{fZ+@%RDAS$w-;63&8ca0bZt0p130fg|u@*asKEg>W9MAx=MTx^K=Wp#z{fnzVw| zs+F(R;8mG5#|~rtwZ8Y0N=rJTcqODMc3N(d7vH4Oi(+YJ7gs3FOPixe%#7nW>R4Dr z);gDvo5(Gq%Q>sHL!CLyL_{&pX-gFj#NA*JanYzCNmIT^g>Rfno_69zjLpGvI&ahZ zv(6Xu+yyIB=8y!LSE#6ZHF`=SzvIq+9S@@RX=P{KZsBpz3&xiEBuA%UgExBB27gh7 zY{DL&DMydpLbpa@w}>`WFMcKceRdK%8~y}*Ays8Q+x}H|GG4~)sa50ltNS=)Guu4N z>aywRpbAq=-UUqSFEQji2KDuyx)5@rl|3hu#A2OsO1@?YD^UF2`uRw|OsiM#*lXn! zUG?pTD$tnZ1~SWXs87pr(Wh=%9vqZYb=5=j74FboB62LTT$dOAB>s+= z0(GYxoVGR4U+2eaG*Suf*1AWZ)kQ~+4jpxtw7tTEs?<{xbi56{a$pCJPOmv`tXnjU zujnq$*dR*V0w#?hFoO zdS!S9t?h!{I;XZ`LaU2!>%17*6&v_;MDQHh@-2ukua46GE<;z1CMV=Q>T@1avWbX zX1=sDVWNl^yO%j5{?x&Q!(fx{>V1I`pGj=bX#2|48B`M2L6Do z1iDTS_UXt3u@<$h^m?ZmAe?AyW64yr&QbKU?CS|aS#NLeICfVZ2E?_~viXwVCe>1--(1UzY$pkr8;8_aP;i0;zGz@;iLXeM?`1sfR?c7LelZ^O xF@$_QDTe`)nf Date: Wed, 1 Jul 2020 15:45:01 +0200 Subject: [PATCH 5/5] work in progress --- pkg/rhttp/datatx/manager/tus/tus.go | 151 +++++++++++++++++++++++++++- pkg/storage/utils/localfs/upload.go | 32 +++--- 2 files changed, 158 insertions(+), 25 deletions(-) diff --git a/pkg/rhttp/datatx/manager/tus/tus.go b/pkg/rhttp/datatx/manager/tus/tus.go index 0330ddb7ad..fe5a9ef688 100644 --- a/pkg/rhttp/datatx/manager/tus/tus.go +++ b/pkg/rhttp/datatx/manager/tus/tus.go @@ -20,8 +20,13 @@ package tus import ( "context" + "encoding/json" "io" + "io/ioutil" "net/http" + "os" + "path" + "path/filepath" "strings" "github.com/pkg/errors" @@ -72,8 +77,6 @@ func (m *manager) Handler(prefix string, fs storage.FS) (http.Handler, error) { config := tusd.Config{ BasePath: prefix, StoreComposer: composer, - - //Logger: logger, // TODO use logger } handler, err := tusd.NewUnroutedHandler(config) @@ -140,11 +143,11 @@ type Composable interface { } type proxy struct { - fs storage.FS + storage.FS } func newProxy(fs storage.FS) *proxy { - return &proxy{fs: fs} + return &proxy{FS: fs} } func (p *proxy) UseIn(c *tusd.StoreComposer) { @@ -155,9 +158,147 @@ func (p *proxy) UseIn(c *tusd.StoreComposer) { } func (p *proxy) NewUpload(ctx context.Context, info tusd.FileInfo) (upload tusd.Upload, err error) { - return + log := appctx.GetLogger(ctx) + log.Debug().Interface("info", info).Msg("tus: NewUpload") + + fn := info.MetaData["filename"] + if fn == "" { + return nil, errors.New("tus: missing filename in metadata") + } + info.MetaData["filename"] = path.Clean(info.MetaData["filename"]) + + dir := info.MetaData["dir"] + if dir == "" { + return nil, errors.New("tus: missing dir in metadata") + } + info.MetaData["dir"] = path.Clean(info.MetaData["dir"]) + + fullpath := path.Join("/", dir, fn) + ref := &provider.Reference{ + Spec: &provider.Reference_Path{ + Path: fullpath, + }, + } + + id, err := p.InitiateUpload(ctx, ref, info.Size, info.MetaData) + if err != nil { + return nil, errors.New("tus: error obtaining upload if from fs") + } + info.ID = id + log.Debug().Interface("", info).Msg("tus: obtained id from fs") + + return nil, nil } func (p *proxy) GetUpload(ctx context.Context, id string) (upload tusd.Upload, err error) { return nil, nil } + +type fileUpload struct { + info tusd.FileInfo + file string + fileInfo string + fs storage.FS +} + +// GetInfo returns the FileInfo +func (upload *fileUpload) GetInfo(ctx context.Context) (tusd.FileInfo, error) { + return upload.info, nil +} + +// GetReader returns an io.Reader for the upload +func (upload *fileUpload) GetReader(ctx context.Context) (io.Reader, error) { + ref := &provider.Reference{Spec: &provider.Reference_Path{Path: upload.file}} + return upload.fs.Download(ctx, ref) +} + +// WriteChunk writes the stream from the reader to the given offset of the upload +// TODO use the grpc api to directly stream to a temporary uploads location in the eos shadow tree +func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { + file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) + if err != nil { + return 0, err + } + defer file.Close() + + n, err := io.Copy(file, src) + + // If the HTTP PATCH request gets interrupted in the middle (e.g. because + // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. + // However, for OwnCloudStore it's not important whether the stream has ended + // on purpose or accidentally. + if err != nil { + if err != io.ErrUnexpectedEOF { + return n, err + } + } + + upload.info.Offset += n + err = upload.writeInfo() + + return n, err +} + +// writeInfo updates the entire information. Everything will be overwritten. +func (upload *fileUpload) writeInfo() error { + data, err := json.Marshal(upload.info) + if err != nil { + return err + } + return ioutil.WriteFile(upload.infoPath, data, defaultFilePerm) +} + +// FinishUpload finishes an upload and moves the file to the internal destination +func (upload *fileUpload) FinishUpload(ctx context.Context) error { + + checksum := upload.info.MetaData["checksum"] + if checksum != "" { + // check checksum + s := strings.SplitN(checksum, " ", 2) + if len(s) == 2 { + alg, hash := s[0], s[1] + + log := appctx.GetLogger(ctx) + log.Debug(). + Interface("info", upload.info). + Str("alg", alg). + Str("hash", hash). + Msg("eos: TODO check checksum") // TODO this is done by eos if we write chunks to it directly + + } + } + np := filepath.Join(upload.info.MetaData["dir"], upload.info.MetaData["filename"]) + + // TODO check etag with If-Match header + // if destination exists + //if _, err := os.Stat(np); err == nil { + // copy attributes of existing file to tmp file befor overwriting the target? + // eos creates revisions internally + //} + + err := upload.fs.c.WriteFile(ctx, upload.info.Storage["Username"], np, upload.binPath) + + // only delete the upload if it was successfully written to eos + if err == nil { + // cleanup in the background, delete might take a while and we don't need to wait for it to finish + go func() { + if err := os.Remove(upload.infoPath); err != nil { + if !os.IsNotExist(err) { + log := appctx.GetLogger(ctx) + log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload info") + } + } + if err := os.Remove(upload.binPath); err != nil { + if !os.IsNotExist(err) { + log := appctx.GetLogger(ctx) + log.Err(err).Interface("info", upload.info).Msg("eos: could not delete upload binary") + } + } + }() + } + + // TODO: set mtime if specified in metadata + + // metadata propagation is left to the storage implementation + return err +} diff --git a/pkg/storage/utils/localfs/upload.go b/pkg/storage/utils/localfs/upload.go index 240dcb1077..33acd841c0 100644 --- a/pkg/storage/utils/localfs/upload.go +++ b/pkg/storage/utils/localfs/upload.go @@ -21,6 +21,7 @@ package localfs import ( "context" "encoding/json" + "fmt" "io" "io/ioutil" "os" @@ -74,37 +75,28 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea return nil } -// InitiateUpload returns an upload id that can be used for uploads with tus -// It resolves the resurce and then reuses the NewUpload function -// Currently requires the uploadLength to be set -// TODO to implement LengthDeferrerDataStore make size optional -// TODO read optional content for small files in this request +// InitiateUpload returns an upload id that can be used for chunked uploads like with TUS protocol. func (fs *localfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (uploadID string, err error) { np, err := fs.resolve(ctx, ref) if err != nil { return "", errors.Wrap(err, "localfs: error resolving reference") } - info := tusd.FileInfo{ - MetaData: tusd.MetaData{ - "filename": filepath.Base(np), - "dir": filepath.Dir(np), - }, - Size: uploadLength, - } + // generate new id + id := uuid.New().String() - if metadata != nil && metadata["mtime"] != "" { - info.MetaData["mtime"] = metadata["mtime"] - } + metadata["filename"] = filepath.Base(np) + metadata["dir"] = filepath.Base(np) + metadata["id"] = id + metadata["size"] = fmt.Sprintf("%d", uploadLength) - upload, err := fs.NewUpload(ctx, info) + js, err := json.Marshal(metadata) if err != nil { - return "", err + return errors.Wrap(err, "localfs: error when marshaling upload metadata") } - info, _ = upload.GetInfo(ctx) - - return info.ID, nil + // we need to persist this metadata + return id, nil } // UseIn tells the tus upload middleware which extensions it supports.