From b54df7d1256dcc794d1edcd0315c2ce55f0c5747 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Fri, 13 Jan 2023 12:04:57 +0100 Subject: [PATCH 1/2] pass the eventsstream to storage-providers Signed-off-by: jkoberg --- .../storageprovider/storageprovider.go | 61 ++++++++++++++++++- .../services/dataprovider/dataprovider.go | 22 ++++--- .../storage/eoshomewrapper/eoshomewrapper.go | 3 +- pkg/cbox/storage/eoswrapper/eoswrapper.go | 3 +- pkg/storage/fs/cephfs/cephfs.go | 3 +- pkg/storage/fs/cephfs/unsupported.go | 3 +- pkg/storage/fs/eos/eos.go | 3 +- pkg/storage/fs/eosgrpc/eosgrpc.go | 3 +- pkg/storage/fs/eosgrpchome/eosgrpchome.go | 3 +- pkg/storage/fs/eoshome/eoshome.go | 3 +- pkg/storage/fs/local/local.go | 3 +- pkg/storage/fs/localhome/localhome.go | 3 +- pkg/storage/fs/nextcloud/nextcloud.go | 3 +- pkg/storage/fs/nextcloud/nextcloud_test.go | 2 +- pkg/storage/fs/ocis/ocis.go | 5 +- pkg/storage/fs/ocis/ocis_test.go | 2 +- pkg/storage/fs/owncloudsql/owncloudsql.go | 3 +- pkg/storage/fs/registry/registry.go | 7 ++- pkg/storage/fs/s3/s3.go | 3 +- pkg/storage/fs/s3ng/s3ng.go | 5 +- pkg/storage/fs/s3ng/s3ng_test.go | 4 +- .../utils/decomposedfs/decomposedfs.go | 45 +------------- .../utils/decomposedfs/decomposedfs_test.go | 2 +- .../grpc/gateway_storageprovider_test.go | 8 +-- .../integration/grpc/storageprovider_test.go | 5 +- 25 files changed, 122 insertions(+), 85 deletions(-) diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index 5446cc264f..2491ee5b8c 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -19,9 +19,13 @@ package storageprovider import ( + "bytes" "context" + "crypto/tls" + "crypto/x509" "encoding/json" "fmt" + "io" "net/url" "os" "path" @@ -35,6 +39,8 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/cs3org/reva/v2/pkg/mime" "github.com/cs3org/reva/v2/pkg/rgrpc" "github.com/cs3org/reva/v2/pkg/rgrpc/status" @@ -44,6 +50,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storagespace" rtrace "github.com/cs3org/reva/v2/pkg/trace" "github.com/cs3org/reva/v2/pkg/utils" + "github.com/go-micro/plugins/v4/events/natsjs" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "go.opentelemetry.io/otel/attribute" @@ -66,6 +73,15 @@ type config struct { CustomMimeTypesJSON string `mapstructure:"custom_mimetypes_json" docs:"nil;An optional mapping file with the list of supported custom file extensions and corresponding mime types."` MountID string `mapstructure:"mount_id"` UploadExpiration int64 `mapstructure:"upload_expiration" docs:"0;Duration for how long uploads will be valid."` + Events eventconfig `mapstructure:"events" docs:"0;Event stream configuration"` +} + +type eventconfig struct { + NatsAddress string `mapstructure:"nats_address" docs:"address of the nats server"` + NatsClusterID string `mapstructure:"nats_clusterid" docs:"clusterid of the nats server"` + EnableTLS bool `mapstructure:"nats_enable_tls" docs:"events tls switch"` + TLSInsecure bool `mapstructure:"tls_insecure" docs:"Whether to verify the server TLS certificates."` + TLSRootCACertificate string `mapstructure:"tls_root_ca_cert" docs:"The root CA certificate used to validate the server's TLS certificate."` } func (c *config) init() { @@ -1182,9 +1198,15 @@ func (s *service) addMissingStorageProviderID(resourceID *provider.ResourceId, s } func getFS(c *config) (storage.FS, error) { + evstream, err := estreamFromConfig(c.Events) + if err != nil { + return nil, err + } + if f, ok := registry.NewFuncs[c.Driver]; ok { - return f(c.Drivers[c.Driver]) + return f(c.Drivers[c.Driver], evstream) } + return nil, errtypes.NotFound("driver not found: " + c.Driver) } @@ -1201,3 +1223,40 @@ func (v descendingMtime) Less(i, j int) bool { func (v descendingMtime) Swap(i, j int) { v[i], v[j] = v[j], v[i] } + +func estreamFromConfig(c eventconfig) (events.Stream, error) { + if c.NatsAddress == "" { + return nil, nil + } + var ( + rootCAPool *x509.CertPool + tlsConf *tls.Config + ) + if c.TLSRootCACertificate != "" { + rootCrtFile, err := os.Open(c.TLSRootCACertificate) + if err != nil { + return nil, err + } + + var certBytes bytes.Buffer + if _, err := io.Copy(&certBytes, rootCrtFile); err != nil { + return nil, err + } + + rootCAPool = x509.NewCertPool() + rootCAPool.AppendCertsFromPEM(certBytes.Bytes()) + c.TLSInsecure = false + + tlsConf = &tls.Config{ + InsecureSkipVerify: c.TLSInsecure, //nolint:gosec + RootCAs: rootCAPool, + } + } + + s, err := stream.Nats(natsjs.Address(c.NatsAddress), natsjs.ClusterID(c.NatsClusterID), natsjs.TLSConfig(tlsConf)) + if err != nil { + return nil, err + } + + return s, nil +} diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index 2f859977a4..0e48629dff 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -81,12 +81,7 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) conf.init() - fs, err := getFS(conf) - if err != nil { - return nil, err - } - - var publisher events.Publisher + var evstream events.Stream if conf.NatsAddress == "" || conf.NatsClusterID == "" { log.Warn().Msg("missing or incomplete nats configuration. Events will not be published.") @@ -114,13 +109,20 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) RootCAs: rootCAPool, } } - publisher, err = stream.Nats(natsjs.TLSConfig(tlsConf), natsjs.Address(conf.NatsAddress), natsjs.ClusterID(conf.NatsClusterID)) + s, err := stream.Nats(natsjs.TLSConfig(tlsConf), natsjs.Address(conf.NatsAddress), natsjs.ClusterID(conf.NatsClusterID)) if err != nil { return nil, err } + + evstream = s + } + + fs, err := getFS(conf, evstream) + if err != nil { + return nil, err } - dataTXs, err := getDataTXs(conf, fs, publisher) + dataTXs, err := getDataTXs(conf, fs, evstream) if err != nil { return nil, err } @@ -135,9 +137,9 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) return s, err } -func getFS(c *config) (storage.FS, error) { +func getFS(c *config, stream events.Stream) (storage.FS, error) { if f, ok := registry.NewFuncs[c.Driver]; ok { - return f(c.Drivers[c.Driver]) + return f(c.Drivers[c.Driver], stream) } return nil, fmt.Errorf("driver not found: %s", c.Driver) } diff --git a/pkg/cbox/storage/eoshomewrapper/eoshomewrapper.go b/pkg/cbox/storage/eoshomewrapper/eoshomewrapper.go index 3e46f65104..e3f31b8d63 100644 --- a/pkg/cbox/storage/eoshomewrapper/eoshomewrapper.go +++ b/pkg/cbox/storage/eoshomewrapper/eoshomewrapper.go @@ -27,6 +27,7 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/cs3org/reva/v2/pkg/storage/utils/eosfs" @@ -65,7 +66,7 @@ func parseConfig(m map[string]interface{}) (*eosfs.Config, string, error) { // New returns an implementation of the storage.FS interface that forms a wrapper // around separate connections to EOS. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) { c, t, err := parseConfig(m) if err != nil { return nil, err diff --git a/pkg/cbox/storage/eoswrapper/eoswrapper.go b/pkg/cbox/storage/eoswrapper/eoswrapper.go index 6f1e083746..0158dc074b 100644 --- a/pkg/cbox/storage/eoswrapper/eoswrapper.go +++ b/pkg/cbox/storage/eoswrapper/eoswrapper.go @@ -30,6 +30,7 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/cs3org/reva/v2/pkg/storage/utils/eosfs" @@ -85,7 +86,7 @@ func parseConfig(m map[string]interface{}) (*eosfs.Config, string, error) { // New returns an implementation of the storage.FS interface that forms a wrapper // around separate connections to EOS. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) { c, t, err := parseConfig(m) if err != nil { return nil, err diff --git a/pkg/storage/fs/cephfs/cephfs.go b/pkg/storage/fs/cephfs/cephfs.go index 0b10336fd6..20e4b63cdb 100644 --- a/pkg/storage/fs/cephfs/cephfs.go +++ b/pkg/storage/fs/cephfs/cephfs.go @@ -35,6 +35,7 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/mitchellh/mapstructure" @@ -64,7 +65,7 @@ func init() { // New returns an implementation to of the storage.FS interface that talk to // a ceph filesystem. -func New(m map[string]interface{}) (fs storage.FS, err error) { +func New(m map[string]interface{}, _ events.Stream) (fs storage.FS, err error) { c := &Options{} if err = mapstructure.Decode(m, c); err != nil { return nil, errors.Wrap(err, "error decoding conf") diff --git a/pkg/storage/fs/cephfs/unsupported.go b/pkg/storage/fs/cephfs/unsupported.go index a409351b14..e11e805168 100644 --- a/pkg/storage/fs/cephfs/unsupported.go +++ b/pkg/storage/fs/cephfs/unsupported.go @@ -24,6 +24,7 @@ package cephfs import ( "github.com/pkg/errors" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" ) @@ -34,6 +35,6 @@ func init() { // New returns an implementation to of the storage.FS interface that talk to // a ceph filesystem. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) { return nil, errors.New("cephfs: revad was compiled without CephFS support") } diff --git a/pkg/storage/fs/eos/eos.go b/pkg/storage/fs/eos/eos.go index 96169cc9ad..e8b3f48225 100644 --- a/pkg/storage/fs/eos/eos.go +++ b/pkg/storage/fs/eos/eos.go @@ -19,6 +19,7 @@ package eos import ( + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/cs3org/reva/v2/pkg/storage/utils/eosfs" @@ -46,7 +47,7 @@ func parseConfig(m map[string]interface{}) (*eosfs.Config, error) { } // New returns a new implementation of the storage.FS interface that connects to EOS. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) { c, err := parseConfig(m) if err != nil { return nil, err diff --git a/pkg/storage/fs/eosgrpc/eosgrpc.go b/pkg/storage/fs/eosgrpc/eosgrpc.go index bfb1aa0c8f..6ce66df7ea 100644 --- a/pkg/storage/fs/eosgrpc/eosgrpc.go +++ b/pkg/storage/fs/eosgrpc/eosgrpc.go @@ -19,6 +19,7 @@ package eosgrpc import ( + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/cs3org/reva/v2/pkg/storage/utils/eosfs" @@ -46,7 +47,7 @@ func parseConfig(m map[string]interface{}) (*eosfs.Config, error) { } // New returns a new implementation of the storage.FS interface that connects to EOS. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) { c, err := parseConfig(m) if err != nil { return nil, err diff --git a/pkg/storage/fs/eosgrpchome/eosgrpchome.go b/pkg/storage/fs/eosgrpchome/eosgrpchome.go index c85308b149..8dc36527bb 100644 --- a/pkg/storage/fs/eosgrpchome/eosgrpchome.go +++ b/pkg/storage/fs/eosgrpchome/eosgrpchome.go @@ -19,6 +19,7 @@ package eosgrpchome import ( + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/cs3org/reva/v2/pkg/storage/utils/eosfs" @@ -46,7 +47,7 @@ func parseConfig(m map[string]interface{}) (*eosfs.Config, error) { } // New returns a new implementation of the storage.FS interface that connects to EOS. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) { c, err := parseConfig(m) if err != nil { return nil, err diff --git a/pkg/storage/fs/eoshome/eoshome.go b/pkg/storage/fs/eoshome/eoshome.go index 435971becf..0e8cfb85d3 100644 --- a/pkg/storage/fs/eoshome/eoshome.go +++ b/pkg/storage/fs/eoshome/eoshome.go @@ -19,6 +19,7 @@ package eoshome import ( + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/cs3org/reva/v2/pkg/storage/utils/eosfs" @@ -46,7 +47,7 @@ func parseConfig(m map[string]interface{}) (*eosfs.Config, error) { } // New returns a new implementation of the storage.FS interface that connects to EOS. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) { c, err := parseConfig(m) if err != nil { return nil, err diff --git a/pkg/storage/fs/local/local.go b/pkg/storage/fs/local/local.go index 24d5d1e175..a5510262a3 100644 --- a/pkg/storage/fs/local/local.go +++ b/pkg/storage/fs/local/local.go @@ -19,6 +19,7 @@ package local import ( + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/cs3org/reva/v2/pkg/storage/utils/localfs" @@ -48,7 +49,7 @@ func parseConfig(m map[string]interface{}) (*config, error) { // New returns an implementation to of the storage.FS interface that talks to // a local filesystem with user homes disabled. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) { c, err := parseConfig(m) if err != nil { return nil, err diff --git a/pkg/storage/fs/localhome/localhome.go b/pkg/storage/fs/localhome/localhome.go index ad56727806..a442acbb52 100644 --- a/pkg/storage/fs/localhome/localhome.go +++ b/pkg/storage/fs/localhome/localhome.go @@ -19,6 +19,7 @@ package localhome import ( + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/cs3org/reva/v2/pkg/storage/utils/localfs" @@ -47,7 +48,7 @@ func parseConfig(m map[string]interface{}) (*config, error) { // New returns an implementation to of the storage.FS interface that talks to // a local filesystem with user homes. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) { c, err := parseConfig(m) if err != nil { return nil, err diff --git a/pkg/storage/fs/nextcloud/nextcloud.go b/pkg/storage/fs/nextcloud/nextcloud.go index 41e173f789..de299c6d24 100644 --- a/pkg/storage/fs/nextcloud/nextcloud.go +++ b/pkg/storage/fs/nextcloud/nextcloud.go @@ -32,6 +32,7 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/mitchellh/mapstructure" @@ -68,7 +69,7 @@ func parseConfig(m map[string]interface{}) (*StorageDriverConfig, error) { // New returns an implementation to of the storage.FS interface that talks to // a Nextcloud instance over http. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) { conf, err := parseConfig(m) if err != nil { return nil, err diff --git a/pkg/storage/fs/nextcloud/nextcloud_test.go b/pkg/storage/fs/nextcloud/nextcloud_test.go index c33f366e75..8cb64a0a35 100644 --- a/pkg/storage/fs/nextcloud/nextcloud_test.go +++ b/pkg/storage/fs/nextcloud/nextcloud_test.go @@ -118,7 +118,7 @@ var _ = Describe("Nextcloud", func() { Describe("New", func() { It("returns a new instance", func() { - _, err := nextcloud.New(options) + _, err := nextcloud.New(options, nil) Expect(err).ToNot(HaveOccurred()) }) }) diff --git a/pkg/storage/fs/ocis/ocis.go b/pkg/storage/fs/ocis/ocis.go index d84d339334..812b44e4f7 100644 --- a/pkg/storage/fs/ocis/ocis.go +++ b/pkg/storage/fs/ocis/ocis.go @@ -21,6 +21,7 @@ package ocis import ( "path" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" @@ -34,7 +35,7 @@ func init() { // New returns an implementation to of the storage.FS interface that talk to // a local filesystem. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) { o, err := options.New(m) if err != nil { return nil, err @@ -45,5 +46,5 @@ func New(m map[string]interface{}) (storage.FS, error) { return nil, err } - return decomposedfs.NewDefault(m, bs) + return decomposedfs.NewDefault(m, bs, stream) } diff --git a/pkg/storage/fs/ocis/ocis_test.go b/pkg/storage/fs/ocis/ocis_test.go index ea20389af0..6e0ff5d046 100644 --- a/pkg/storage/fs/ocis/ocis_test.go +++ b/pkg/storage/fs/ocis/ocis_test.go @@ -53,7 +53,7 @@ var _ = Describe("Ocis", func() { Describe("New", func() { It("returns a new instance", func() { - _, err := ocis.New(options) + _, err := ocis.New(options, nil) Expect(err).ToNot(HaveOccurred()) }) }) diff --git a/pkg/storage/fs/owncloudsql/owncloudsql.go b/pkg/storage/fs/owncloudsql/owncloudsql.go index d919cdd2be..945e864d23 100644 --- a/pkg/storage/fs/owncloudsql/owncloudsql.go +++ b/pkg/storage/fs/owncloudsql/owncloudsql.go @@ -44,6 +44,7 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/mime" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" @@ -150,7 +151,7 @@ func (c *config) init(m map[string]interface{}) { // New returns an implementation to of the storage.FS interface that talk to // a local filesystem. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) { c, err := parseConfig(m) if err != nil { return nil, err diff --git a/pkg/storage/fs/registry/registry.go b/pkg/storage/fs/registry/registry.go index a263fd9629..a75c8cf272 100644 --- a/pkg/storage/fs/registry/registry.go +++ b/pkg/storage/fs/registry/registry.go @@ -18,11 +18,14 @@ package registry -import "github.com/cs3org/reva/v2/pkg/storage" +import ( + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/storage" +) // NewFunc is the function that storage implementations // should register at init time. -type NewFunc func(map[string]interface{}) (storage.FS, error) +type NewFunc func(map[string]interface{}, events.Stream) (storage.FS, error) // NewFuncs is a map containing all the registered storage backends. var NewFuncs = map[string]NewFunc{} diff --git a/pkg/storage/fs/s3/s3.go b/pkg/storage/fs/s3/s3.go index 1d6604cc3c..fb8f63a741 100644 --- a/pkg/storage/fs/s3/s3.go +++ b/pkg/storage/fs/s3/s3.go @@ -38,6 +38,7 @@ import ( types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/cs3org/reva/v2/pkg/appctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/mime" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" @@ -69,7 +70,7 @@ func parseConfig(m map[string]interface{}) (*config, error) { // New returns an implementation to of the storage.FS interface that talk to // a s3 api. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) { c, err := parseConfig(m) if err != nil { return nil, err diff --git a/pkg/storage/fs/s3ng/s3ng.go b/pkg/storage/fs/s3ng/s3ng.go index e88e5a6ffb..5cc7f8873b 100644 --- a/pkg/storage/fs/s3ng/s3ng.go +++ b/pkg/storage/fs/s3ng/s3ng.go @@ -21,6 +21,7 @@ package s3ng import ( "fmt" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" "github.com/cs3org/reva/v2/pkg/storage/fs/s3ng/blobstore" @@ -33,7 +34,7 @@ func init() { // New returns an implementation to of the storage.FS interface that talk to // a local filesystem. -func New(m map[string]interface{}) (storage.FS, error) { +func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) { o, err := parseConfig(m) if err != nil { return nil, err @@ -48,5 +49,5 @@ func New(m map[string]interface{}) (storage.FS, error) { return nil, err } - return decomposedfs.NewDefault(m, bs) + return decomposedfs.NewDefault(m, bs, stream) } diff --git a/pkg/storage/fs/s3ng/s3ng_test.go b/pkg/storage/fs/s3ng/s3ng_test.go index 732193723b..f37d557efa 100644 --- a/pkg/storage/fs/s3ng/s3ng_test.go +++ b/pkg/storage/fs/s3ng/s3ng_test.go @@ -58,12 +58,12 @@ var _ = Describe("S3ng", func() { Describe("New", func() { It("fails on missing s3 configuration", func() { - _, err := s3ng.New(map[string]interface{}{}) + _, err := s3ng.New(map[string]interface{}{}, nil) Expect(err).To(MatchError("S3 configuration incomplete")) }) It("works with complete configuration", func() { - _, err := s3ng.New(options) + _, err := s3ng.New(options, nil) Expect(err).ToNot(HaveOccurred()) }) }) diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 215bfe200b..6e58539211 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -23,10 +23,7 @@ package decomposedfs //go:generate make --no-print-directory -C ../../../.. mockery NAME=Tree import ( - "bytes" "context" - "crypto/tls" - "crypto/x509" "io" "net/url" "os" @@ -43,7 +40,6 @@ import ( ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/events" - "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/storage" @@ -59,7 +55,6 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/templates" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" - "github.com/go-micro/plugins/v4/events/natsjs" "github.com/pkg/errors" ) @@ -100,7 +95,7 @@ type Decomposedfs struct { } // NewDefault returns an instance with default components -func NewDefault(m map[string]interface{}, bs tree.Blobstore) (storage.FS, error) { +func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (storage.FS, error) { o, err := options.New(m) if err != nil { return nil, err @@ -118,44 +113,6 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore) (storage.FS, error) permissions := NewPermissions(node.NewPermissions(lu), permissionsClient) - var es events.Stream - if o.Events.NatsAddress != "" { - evtsCfg := o.Events - var ( - rootCAPool *x509.CertPool - tlsConf *tls.Config - ) - if evtsCfg.TLSRootCACertificate != "" { - rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) - if err != nil { - return nil, err - } - - var certBytes bytes.Buffer - if _, err := io.Copy(&certBytes, rootCrtFile); err != nil { - return nil, err - } - - rootCAPool = x509.NewCertPool() - rootCAPool.AppendCertsFromPEM(certBytes.Bytes()) - evtsCfg.TLSInsecure = false - - tlsConf = &tls.Config{ - InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec - RootCAs: rootCAPool, - } - } - - es, err = stream.Nats( - natsjs.TLSConfig(tlsConf), - natsjs.Address(evtsCfg.NatsAddress), - natsjs.ClusterID(evtsCfg.NatsClusterID), - ) - if err != nil { - return nil, err - } - } - return New(o, lu, permissions, tp, es) } diff --git a/pkg/storage/utils/decomposedfs/decomposedfs_test.go b/pkg/storage/utils/decomposedfs/decomposedfs_test.go index b3588c25f2..143e44ec17 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs_test.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs_test.go @@ -59,7 +59,7 @@ var _ = Describe("Decomposed", func() { bs := &treemocks.Blobstore{} _, err := decomposedfs.NewDefault(map[string]interface{}{ "root": env.Root, - }, bs) + }, bs, nil) Expect(err).ToNot(HaveOccurred()) }) }) diff --git a/tests/integration/grpc/gateway_storageprovider_test.go b/tests/integration/grpc/gateway_storageprovider_test.go index 8ea003ca9c..02e1585a51 100644 --- a/tests/integration/grpc/gateway_storageprovider_test.go +++ b/tests/integration/grpc/gateway_storageprovider_test.go @@ -182,7 +182,7 @@ var _ = Describe("gateway", func() { "enable_home": true, "treesize_accounting": true, "treetime_accounting": true, - }) + }, nil) Expect(err).ToNot(HaveOccurred()) res, err := shard1Fs.CreateStorageSpace(ctx, &storagep.CreateStorageSpaceRequest{ Type: "project", @@ -209,7 +209,7 @@ var _ = Describe("gateway", func() { "enable_home": true, "treesize_accounting": true, "treetime_accounting": true, - }) + }, nil) Expect(err).ToNot(HaveOccurred()) res, err = shard2Fs.CreateStorageSpace(ctx, &storagep.CreateStorageSpaceRequest{ Type: "project", @@ -370,7 +370,7 @@ var _ = Describe("gateway", func() { "enable_home": true, "treesize_accounting": true, "treetime_accounting": true, - }) + }, nil) Expect(err).ToNot(HaveOccurred()) r, err := serviceClient.CreateHome(ctx, &storagep.CreateHomeRequest{}) @@ -396,7 +396,7 @@ var _ = Describe("gateway", func() { "enable_home": true, "treesize_accounting": true, "treetime_accounting": true, - }) + }, nil) Expect(err).ToNot(HaveOccurred()) res, err := serviceClient.CreateStorageSpace(ctx, &storagep.CreateStorageSpaceRequest{ Type: "project", diff --git a/tests/integration/grpc/storageprovider_test.go b/tests/integration/grpc/storageprovider_test.go index c971449cf7..5404ba3f7f 100644 --- a/tests/integration/grpc/storageprovider_test.go +++ b/tests/integration/grpc/storageprovider_test.go @@ -28,6 +28,7 @@ import ( storagep "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/auth/scope" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/nextcloud" @@ -55,7 +56,7 @@ func ref(provider string, path string) *storagep.Reference { func createFS(provider string, revads map[string]*Revad) (storage.FS, error) { conf := make(map[string]interface{}) - var f func(map[string]interface{}) (storage.FS, error) + var f func(map[string]interface{}, events.Stream) (storage.FS, error) switch provider { case "ocis": conf["root"] = revads["storage"].StorageRoot @@ -66,7 +67,7 @@ func createFS(provider string, revads map[string]*Revad) (storage.FS, error) { conf["mock_http"] = true f = nextcloud.New } - return f(conf) + return f(conf, nil) } // This test suite tests the gprc storageprovider interface using different From aeeab5d006bda129b40ff9e60f196d5a078c294c Mon Sep 17 00:00:00 2001 From: jkoberg Date: Fri, 13 Jan 2023 13:47:16 +0100 Subject: [PATCH 2/2] changelog Signed-off-by: jkoberg --- changelog/unreleased/pass-estream-to-storageproviders.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/unreleased/pass-estream-to-storageproviders.md diff --git a/changelog/unreleased/pass-estream-to-storageproviders.md b/changelog/unreleased/pass-estream-to-storageproviders.md new file mode 100644 index 0000000000..e1839dbe02 --- /dev/null +++ b/changelog/unreleased/pass-estream-to-storageproviders.md @@ -0,0 +1,5 @@ +Enhancement: Pass estream to Storage Providers + +Similar to the data providers we now pass the stream to the `New` func. This will reduce connections from storage providers to nats. + +https://github.com/cs3org/reva/pull/3598