Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass Estream to StorageProviders #3598

Merged
merged 2 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/pass-estream-to-storageproviders.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Pass estream to Storage Providers

Similar to the data providers we now pass the stream to the `New` func. This will reduce connections from storage providers to nats.

https://github.com/cs3org/reva/pull/3598
61 changes: 60 additions & 1 deletion internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package storageprovider

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net/url"
"os"
"path"
Expand All @@ -35,6 +39,8 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/mime"
"github.com/cs3org/reva/v2/pkg/rgrpc"
"github.com/cs3org/reva/v2/pkg/rgrpc/status"
Expand All @@ -44,6 +50,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storagespace"
rtrace "github.com/cs3org/reva/v2/pkg/trace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -66,6 +73,15 @@ type config struct {
CustomMimeTypesJSON string `mapstructure:"custom_mimetypes_json" docs:"nil;An optional mapping file with the list of supported custom file extensions and corresponding mime types."`
MountID string `mapstructure:"mount_id"`
UploadExpiration int64 `mapstructure:"upload_expiration" docs:"0;Duration for how long uploads will be valid."`
Events eventconfig `mapstructure:"events" docs:"0;Event stream configuration"`
}

type eventconfig struct {
NatsAddress string `mapstructure:"nats_address" docs:"address of the nats server"`
NatsClusterID string `mapstructure:"nats_clusterid" docs:"clusterid of the nats server"`
EnableTLS bool `mapstructure:"nats_enable_tls" docs:"events tls switch"`
TLSInsecure bool `mapstructure:"tls_insecure" docs:"Whether to verify the server TLS certificates."`
TLSRootCACertificate string `mapstructure:"tls_root_ca_cert" docs:"The root CA certificate used to validate the server's TLS certificate."`
}

func (c *config) init() {
Expand Down Expand Up @@ -1182,9 +1198,15 @@ func (s *service) addMissingStorageProviderID(resourceID *provider.ResourceId, s
}

func getFS(c *config) (storage.FS, error) {
evstream, err := estreamFromConfig(c.Events)
if err != nil {
return nil, err
}

if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
return f(c.Drivers[c.Driver], evstream)
}

return nil, errtypes.NotFound("driver not found: " + c.Driver)
}

Expand All @@ -1201,3 +1223,40 @@ func (v descendingMtime) Less(i, j int) bool {
func (v descendingMtime) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}

func estreamFromConfig(c eventconfig) (events.Stream, error) {
if c.NatsAddress == "" {
return nil, nil
}
var (
rootCAPool *x509.CertPool
tlsConf *tls.Config
)
if c.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(c.TLSRootCACertificate)
if err != nil {
return nil, err
}

var certBytes bytes.Buffer
if _, err := io.Copy(&certBytes, rootCrtFile); err != nil {
return nil, err
}

rootCAPool = x509.NewCertPool()
rootCAPool.AppendCertsFromPEM(certBytes.Bytes())
c.TLSInsecure = false

tlsConf = &tls.Config{
InsecureSkipVerify: c.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}

s, err := stream.Nats(natsjs.Address(c.NatsAddress), natsjs.ClusterID(c.NatsClusterID), natsjs.TLSConfig(tlsConf))
if err != nil {
return nil, err
}

return s, nil
}
22 changes: 12 additions & 10 deletions internal/http/services/dataprovider/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,7 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error)

conf.init()

fs, err := getFS(conf)
if err != nil {
return nil, err
}

var publisher events.Publisher
var evstream events.Stream

if conf.NatsAddress == "" || conf.NatsClusterID == "" {
log.Warn().Msg("missing or incomplete nats configuration. Events will not be published.")
Expand Down Expand Up @@ -114,13 +109,20 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error)
RootCAs: rootCAPool,
}
}
publisher, err = stream.Nats(natsjs.TLSConfig(tlsConf), natsjs.Address(conf.NatsAddress), natsjs.ClusterID(conf.NatsClusterID))
s, err := stream.Nats(natsjs.TLSConfig(tlsConf), natsjs.Address(conf.NatsAddress), natsjs.ClusterID(conf.NatsClusterID))
if err != nil {
return nil, err
}

evstream = s
}

fs, err := getFS(conf, evstream)
if err != nil {
return nil, err
}

dataTXs, err := getDataTXs(conf, fs, publisher)
dataTXs, err := getDataTXs(conf, fs, evstream)
if err != nil {
return nil, err
}
Expand All @@ -135,9 +137,9 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error)
return s, err
}

func getFS(c *config) (storage.FS, error) {
func getFS(c *config, stream events.Stream) (storage.FS, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
return f(c.Drivers[c.Driver], stream)
}
return nil, fmt.Errorf("driver not found: %s", c.Driver)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cbox/storage/eoshomewrapper/eoshomewrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/eosfs"
Expand Down Expand Up @@ -65,7 +66,7 @@ func parseConfig(m map[string]interface{}) (*eosfs.Config, string, error) {

// New returns an implementation of the storage.FS interface that forms a wrapper
// around separate connections to EOS.
func New(m map[string]interface{}) (storage.FS, error) {
func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) {
c, t, err := parseConfig(m)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/cbox/storage/eoswrapper/eoswrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/eosfs"
Expand Down Expand Up @@ -85,7 +86,7 @@ func parseConfig(m map[string]interface{}) (*eosfs.Config, string, error) {

// New returns an implementation of the storage.FS interface that forms a wrapper
// around separate connections to EOS.
func New(m map[string]interface{}) (storage.FS, error) {
func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) {
c, t, err := parseConfig(m)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/fs/cephfs/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -64,7 +65,7 @@ func init() {

// New returns an implementation to of the storage.FS interface that talk to
// a ceph filesystem.
func New(m map[string]interface{}) (fs storage.FS, err error) {
func New(m map[string]interface{}, _ events.Stream) (fs storage.FS, err error) {
c := &Options{}
if err = mapstructure.Decode(m, c); err != nil {
return nil, errors.Wrap(err, "error decoding conf")
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/fs/cephfs/unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package cephfs
import (
"github.com/pkg/errors"

"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
)
Expand All @@ -34,6 +35,6 @@ func init() {

// New returns an implementation to of the storage.FS interface that talk to
// a ceph filesystem.
func New(m map[string]interface{}) (storage.FS, error) {
func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) {
return nil, errors.New("cephfs: revad was compiled without CephFS support")
}
3 changes: 2 additions & 1 deletion pkg/storage/fs/eos/eos.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package eos

import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/eosfs"
Expand Down Expand Up @@ -46,7 +47,7 @@ func parseConfig(m map[string]interface{}) (*eosfs.Config, error) {
}

// New returns a new implementation of the storage.FS interface that connects to EOS.
func New(m map[string]interface{}) (storage.FS, error) {
func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/fs/eosgrpc/eosgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package eosgrpc

import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/eosfs"
Expand Down Expand Up @@ -46,7 +47,7 @@ func parseConfig(m map[string]interface{}) (*eosfs.Config, error) {
}

// New returns a new implementation of the storage.FS interface that connects to EOS.
func New(m map[string]interface{}) (storage.FS, error) {
func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/fs/eosgrpchome/eosgrpchome.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package eosgrpchome

import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/eosfs"
Expand Down Expand Up @@ -46,7 +47,7 @@ func parseConfig(m map[string]interface{}) (*eosfs.Config, error) {
}

// New returns a new implementation of the storage.FS interface that connects to EOS.
func New(m map[string]interface{}) (storage.FS, error) {
func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/fs/eoshome/eoshome.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package eoshome

import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/eosfs"
Expand Down Expand Up @@ -46,7 +47,7 @@ func parseConfig(m map[string]interface{}) (*eosfs.Config, error) {
}

// New returns a new implementation of the storage.FS interface that connects to EOS.
func New(m map[string]interface{}) (storage.FS, error) {
func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/fs/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package local

import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/localfs"
Expand Down Expand Up @@ -48,7 +49,7 @@ func parseConfig(m map[string]interface{}) (*config, error) {

// New returns an implementation to of the storage.FS interface that talks to
// a local filesystem with user homes disabled.
func New(m map[string]interface{}) (storage.FS, error) {
func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/fs/localhome/localhome.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package localhome

import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/localfs"
Expand Down Expand Up @@ -47,7 +48,7 @@ func parseConfig(m map[string]interface{}) (*config, error) {

// New returns an implementation to of the storage.FS interface that talks to
// a local filesystem with user homes.
func New(m map[string]interface{}) (storage.FS, error) {
func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/fs/nextcloud/nextcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -68,7 +69,7 @@ func parseConfig(m map[string]interface{}) (*StorageDriverConfig, error) {

// New returns an implementation to of the storage.FS interface that talks to
// a Nextcloud instance over http.
func New(m map[string]interface{}) (storage.FS, error) {
func New(m map[string]interface{}, _ events.Stream) (storage.FS, error) {
conf, err := parseConfig(m)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/fs/nextcloud/nextcloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ var _ = Describe("Nextcloud", func() {

Describe("New", func() {
It("returns a new instance", func() {
_, err := nextcloud.New(options)
_, err := nextcloud.New(options, nil)
Expect(err).ToNot(HaveOccurred())
})
})
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/fs/ocis/ocis.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package ocis
import (
"path"

"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
Expand All @@ -34,7 +35,7 @@ func init() {

// New returns an implementation to of the storage.FS interface that talk to
// a local filesystem.
func New(m map[string]interface{}) (storage.FS, error) {
func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
o, err := options.New(m)
if err != nil {
return nil, err
Expand All @@ -45,5 +46,5 @@ func New(m map[string]interface{}) (storage.FS, error) {
return nil, err
}

return decomposedfs.NewDefault(m, bs)
return decomposedfs.NewDefault(m, bs, stream)
}
2 changes: 1 addition & 1 deletion pkg/storage/fs/ocis/ocis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var _ = Describe("Ocis", func() {

Describe("New", func() {
It("returns a new instance", func() {
_, err := ocis.New(options)
_, err := ocis.New(options, nil)
Expect(err).ToNot(HaveOccurred())
})
})
Expand Down
Loading