Skip to content

Commit

Permalink
enhancement: introduce micro registry and client pool selectors
Browse files Browse the repository at this point in the history
  • Loading branch information
fschade committed Jun 6, 2023
1 parent ce240aa commit 458c6e4
Show file tree
Hide file tree
Showing 43 changed files with 1,390 additions and 1,229 deletions.
2 changes: 1 addition & 1 deletion cmd/revad/runtime/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package runtime

import (
"github.com/cs3org/reva/v2/pkg/registry"
"github.com/rs/zerolog"
"go-micro.dev/v4/registry"
)

// Option defines a single option function.
Expand Down
18 changes: 3 additions & 15 deletions cmd/revad/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ import (

"github.com/cs3org/reva/v2/cmd/revad/internal/grace"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/registry/memory"
"github.com/cs3org/reva/v2/pkg/registry"
"github.com/cs3org/reva/v2/pkg/rgrpc"
"github.com/cs3org/reva/v2/pkg/rhttp"
"github.com/cs3org/reva/v2/pkg/sharedconf"
rtrace "github.com/cs3org/reva/v2/pkg/trace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog"
Expand All @@ -55,19 +54,8 @@ func RunWithOptions(mainConf map[string]interface{}, pidFile string, opts ...Opt
parseSharedConfOrDie(mainConf["shared"])
coreConf := parseCoreConfOrDie(mainConf["core"])

// TODO: one can pass the options from the config file to registry.New() and initialize a registry based upon config files.
if options.Registry != nil {
utils.GlobalRegistry = options.Registry
} else if _, ok := mainConf["registry"]; ok {
for _, services := range mainConf["registry"].(map[string]interface{}) {
for sName, nodes := range services.(map[string]interface{}) {
for _, instance := range nodes.([]interface{}) {
if err := utils.GlobalRegistry.Add(memory.NewService(sName, instance.(map[string]interface{})["nodes"].([]interface{}))); err != nil {
panic(err)
}
}
}
}
if err := registry.Init(options.Registry); err != nil {
panic(err)
}

run(mainConf, coreConf, options.Logger, pidFile)
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ require (
github.com/sciencemesh/meshdirectory-web v1.0.4
github.com/sethvargo/go-password v0.2.0
github.com/shamaton/msgpack/v2 v2.1.1
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.3
github.com/studio-b12/gowebdav v0.0.0-20221015232716-17255f2e7423
github.com/test-go/testify v1.1.4
github.com/thanhpk/randstr v1.0.4
Expand All @@ -76,12 +76,11 @@ require (
go.etcd.io/etcd/client/v3 v3.5.5
go.opencensus.io v0.23.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.4
go.opentelemetry.io/otel v1.11.1
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/exporters/jaeger v1.11.1
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.1
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.1
go.opentelemetry.io/otel/sdk v1.11.1
go.opentelemetry.io/otel/trace v1.11.1
go.opentelemetry.io/otel/trace v1.16.0
golang.org/x/crypto v0.1.0
golang.org/x/oauth2 v0.1.0
golang.org/x/sync v0.1.0
Expand All @@ -91,7 +90,6 @@ require (
google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e
google.golang.org/grpc v1.50.1
google.golang.org/protobuf v1.28.1
gotest.tools v2.2.0+incompatible
)

require (
Expand Down Expand Up @@ -124,7 +122,7 @@ require (
github.com/go-git/go-git/v5 v5.4.2 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-micro/plugins/v4/registry/consul v1.1.0 // indirect
github.com/go-micro/plugins/v4/registry/etcd v1.1.0 // indirect
Expand Down Expand Up @@ -202,6 +200,8 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
go.mongodb.org/mongo-driver v1.10.3 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.1 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
Expand Down
327 changes: 317 additions & 10 deletions go.sum

Large diffs are not rendered by default.

44 changes: 31 additions & 13 deletions internal/http/services/owncloud/ocdav/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/spacelookup"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/rhttp"
"github.com/cs3org/reva/v2/pkg/rhttp/router"
"github.com/cs3org/reva/v2/pkg/storagespace"
Expand Down Expand Up @@ -105,7 +106,7 @@ func (s *svc) handlePathCopy(w http.ResponseWriter, r *http.Request, ns string)

sublog := appctx.GetLogger(ctx).With().Str("src", src).Str("dst", dst).Logger()

srcSpace, status, err := spacelookup.LookUpStorageSpaceForPath(ctx, s.gwClient, src)
srcSpace, status, err := spacelookup.LookUpStorageSpaceForPath(ctx, s.gatewaySelector, src)
if err != nil {
sublog.Error().Err(err).Str("path", src).Msg("failed to look up storage space")
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -115,7 +116,7 @@ func (s *svc) handlePathCopy(w http.ResponseWriter, r *http.Request, ns string)
errors.HandleErrorStatus(&sublog, w, status)
return
}
dstSpace, status, err := spacelookup.LookUpStorageSpaceForPath(ctx, s.gwClient, dst)
dstSpace, status, err := spacelookup.LookUpStorageSpaceForPath(ctx, s.gatewaySelector, dst)
if err != nil {
sublog.Error().Err(err).Str("path", dst).Msg("failed to look up storage space")
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -131,17 +132,22 @@ func (s *svc) handlePathCopy(w http.ResponseWriter, r *http.Request, ns string)
return
}

if err := s.executePathCopy(ctx, s.gwClient, w, r, cp); err != nil {
if err := s.executePathCopy(ctx, s.gatewaySelector, w, r, cp); err != nil {
sublog.Error().Err(err).Str("depth", cp.depth.String()).Msg("error executing path copy")
w.WriteHeader(http.StatusInternalServerError)
}
w.WriteHeader(cp.successCode)
}

func (s *svc) executePathCopy(ctx context.Context, client gateway.GatewayAPIClient, w http.ResponseWriter, r *http.Request, cp *copy) error {
func (s *svc) executePathCopy(ctx context.Context, selector pool.Selectable[gateway.GatewayAPIClient], w http.ResponseWriter, r *http.Request, cp *copy) error {
log := appctx.GetLogger(ctx)
log.Debug().Str("src", cp.sourceInfo.Path).Str("dst", cp.destination.Path).Msg("descending")

client, err := selector.Next()
if err != nil {
return err
}

var fileid string
if cp.sourceInfo.Type == provider.ResourceType_RESOURCE_TYPE_CONTAINER {
// create dir
Expand Down Expand Up @@ -192,7 +198,7 @@ func (s *svc) executePathCopy(ctx context.Context, client gateway.GatewayAPIClie
ResourceId: cp.destination.ResourceId,
Path: utils.MakeRelativePath(filepath.Join(cp.destination.Path, child)),
}
err := s.executePathCopy(ctx, client, w, r, &copy{source: src, sourceInfo: res.Infos[i], destination: childDst, depth: cp.depth, successCode: cp.successCode})
err := s.executePathCopy(ctx, selector, w, r, &copy{source: src, sourceInfo: res.Infos[i], destination: childDst, depth: cp.depth, successCode: cp.successCode})
if err != nil {
return err
}
Expand Down Expand Up @@ -354,18 +360,23 @@ func (s *svc) handleSpacesCopy(w http.ResponseWriter, r *http.Request, spaceID s
return
}

err = s.executeSpacesCopy(ctx, w, s.gwClient, cp)
err = s.executeSpacesCopy(ctx, w, s.gatewaySelector, cp)
if err != nil {
sublog.Error().Err(err).Str("depth", cp.depth.String()).Msg("error descending directory")
w.WriteHeader(http.StatusInternalServerError)
}
w.WriteHeader(cp.successCode)
}

func (s *svc) executeSpacesCopy(ctx context.Context, w http.ResponseWriter, client gateway.GatewayAPIClient, cp *copy) error {
func (s *svc) executeSpacesCopy(ctx context.Context, w http.ResponseWriter, selector pool.Selectable[gateway.GatewayAPIClient], cp *copy) error {
log := appctx.GetLogger(ctx)
log.Debug().Interface("src", cp.sourceInfo).Interface("dst", cp.destination).Msg("descending")

client, err := selector.Next()
if err != nil {
return err
}

var fileid string
if cp.sourceInfo.Type == provider.ResourceType_RESOURCE_TYPE_CONTAINER {
// create dir
Expand Down Expand Up @@ -410,7 +421,7 @@ func (s *svc) executeSpacesCopy(ctx context.Context, w http.ResponseWriter, clie
ResourceId: cp.destination.ResourceId,
Path: utils.MakeRelativePath(path.Join(cp.destination.Path, res.Infos[i].Path)),
}
err := s.executeSpacesCopy(ctx, w, client, &copy{sourceInfo: res.Infos[i], destination: childRef, depth: cp.depth, successCode: cp.successCode})
err := s.executeSpacesCopy(ctx, w, selector, &copy{sourceInfo: res.Infos[i], destination: childRef, depth: cp.depth, successCode: cp.successCode})
if err != nil {
return err
}
Expand Down Expand Up @@ -528,7 +539,7 @@ func (s *svc) executeSpacesCopy(ctx context.Context, w http.ResponseWriter, clie
}

func (s *svc) prepareCopy(ctx context.Context, w http.ResponseWriter, r *http.Request, srcRef, dstRef *provider.Reference, log *zerolog.Logger) *copy {
isChild, err := s.referenceIsChildOf(ctx, s.gwClient, dstRef, srcRef)
isChild, err := s.referenceIsChildOf(ctx, s.gatewaySelector, dstRef, srcRef)
if err != nil {
switch err.(type) {
case errtypes.IsNotSupported:
Expand Down Expand Up @@ -573,8 +584,15 @@ func (s *svc) prepareCopy(ctx context.Context, w http.ResponseWriter, r *http.Re

log.Debug().Bool("overwrite", overwrite).Str("depth", depth.String()).Msg("copy")

client, err := s.gatewaySelector.Next()
if err != nil {
log.Error().Err(err).Msg("error selecting next client")
w.WriteHeader(http.StatusInternalServerError)
return nil
}

srcStatReq := &provider.StatRequest{Ref: srcRef}
srcStatRes, err := s.gwClient.Stat(ctx, srcStatReq)
srcStatRes, err := client.Stat(ctx, srcStatReq)
switch {
case err != nil:
log.Error().Err(err).Msg("error sending grpc stat request")
Expand All @@ -592,7 +610,7 @@ func (s *svc) prepareCopy(ctx context.Context, w http.ResponseWriter, r *http.Re
}

dstStatReq := &provider.StatRequest{Ref: dstRef}
dstStatRes, err := s.gwClient.Stat(ctx, dstStatReq)
dstStatRes, err := client.Stat(ctx, dstStatReq)
switch {
case err != nil:
log.Error().Err(err).Msg("error sending grpc stat request")
Expand Down Expand Up @@ -621,7 +639,7 @@ func (s *svc) prepareCopy(ctx context.Context, w http.ResponseWriter, r *http.Re
(dstStatRes.Info.Type == provider.ResourceType_RESOURCE_TYPE_FILE &&
srcStatRes.Info.Type == provider.ResourceType_RESOURCE_TYPE_CONTAINER) {
delReq := &provider.DeleteRequest{Ref: dstRef}
delRes, err := s.gwClient.Delete(ctx, delReq)
delRes, err := client.Delete(ctx, delReq)
if err != nil {
log.Error().Err(err).Msg("error sending grpc delete request")
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -640,7 +658,7 @@ func (s *svc) prepareCopy(ctx context.Context, w http.ResponseWriter, r *http.Re
Path: utils.MakeRelativePath(p),
}
intStatReq := &provider.StatRequest{Ref: pRef}
intStatRes, err := s.gwClient.Stat(ctx, intStatReq)
intStatRes, err := client.Stat(ctx, intStatReq)
if err != nil {
log.Error().Err(err).Msg("error sending grpc stat request")
w.WriteHeader(http.StatusInternalServerError)
Expand Down
26 changes: 20 additions & 6 deletions internal/http/services/owncloud/ocdav/dav.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net"
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/rhttp/router"
"github.com/cs3org/reva/v2/pkg/utils"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -188,7 +189,7 @@ func (h *DavHandler) Handler(s *svc) http.Handler {
var pass string
var err error
if _, pass, hasValidBasicAuthHeader = r.BasicAuth(); hasValidBasicAuthHeader {
res, err = handleBasicAuth(r.Context(), s.gwClient, token, pass)
res, err = handleBasicAuth(r.Context(), s.gatewaySelector, token, pass)
} else {
q := r.URL.Query()
sig := q.Get("signature")
Expand All @@ -198,7 +199,7 @@ func (h *DavHandler) Handler(s *svc) http.Handler {
w.WriteHeader(http.StatusUnauthorized)
return
}
res, err = handleSignatureAuth(r.Context(), s.gwClient, token, sig, expiration)
res, err = handleSignatureAuth(r.Context(), s.gatewaySelector, token, sig, expiration)
}

switch {
Expand Down Expand Up @@ -232,7 +233,7 @@ func (h *DavHandler) Handler(s *svc) http.Handler {
r = r.WithContext(ctx)

// the public share manager knew the token, but does the referenced target still exist?
sRes, err := getTokenStatInfo(ctx, s.gwClient, token)
sRes, err := getTokenStatInfo(ctx, s.gatewaySelector, token)
switch {
case err != nil:
log.Error().Err(err).Msg("error sending grpc stat request")
Expand Down Expand Up @@ -271,7 +272,12 @@ func (h *DavHandler) Handler(s *svc) http.Handler {
})
}

func getTokenStatInfo(ctx context.Context, client gatewayv1beta1.GatewayAPIClient, token string) (*provider.StatResponse, error) {
func getTokenStatInfo(ctx context.Context, selector pool.Selectable[gatewayv1beta1.GatewayAPIClient], token string) (*provider.StatResponse, error) {
client, err := selector.Next()
if err != nil {
return nil, err
}

return client.Stat(ctx, &provider.StatRequest{Ref: &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: utils.PublicStorageProviderID,
Expand All @@ -281,7 +287,11 @@ func getTokenStatInfo(ctx context.Context, client gatewayv1beta1.GatewayAPIClien
}})
}

func handleBasicAuth(ctx context.Context, c gatewayv1beta1.GatewayAPIClient, token, pw string) (*gatewayv1beta1.AuthenticateResponse, error) {
func handleBasicAuth(ctx context.Context, selector pool.Selectable[gatewayv1beta1.GatewayAPIClient], token, pw string) (*gatewayv1beta1.AuthenticateResponse, error) {
c, err := selector.Next()
if err != nil {
return nil, err
}
authenticateRequest := gatewayv1beta1.AuthenticateRequest{
Type: "publicshares",
ClientId: token,
Expand All @@ -291,7 +301,11 @@ func handleBasicAuth(ctx context.Context, c gatewayv1beta1.GatewayAPIClient, tok
return c.Authenticate(ctx, &authenticateRequest)
}

func handleSignatureAuth(ctx context.Context, c gatewayv1beta1.GatewayAPIClient, token, sig, expiration string) (*gatewayv1beta1.AuthenticateResponse, error) {
func handleSignatureAuth(ctx context.Context, selector pool.Selectable[gatewayv1beta1.GatewayAPIClient], token, sig, expiration string) (*gatewayv1beta1.AuthenticateResponse, error) {
c, err := selector.Next()
if err != nil {
return nil, err
}
authenticateRequest := gatewayv1beta1.AuthenticateRequest{
Type: "publicshares",
ClientId: token,
Expand Down
11 changes: 8 additions & 3 deletions internal/http/services/owncloud/ocdav/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *svc) handlePathDelete(w http.ResponseWriter, r *http.Request, ns string

fn := path.Join(ns, r.URL.Path)

space, rpcStatus, err := spacelookup.LookUpStorageSpaceForPath(ctx, s.gwClient, fn)
space, rpcStatus, err := spacelookup.LookUpStorageSpaceForPath(ctx, s.gatewaySelector, fn)
switch {
case err != nil:
span.RecordError(err)
Expand Down Expand Up @@ -73,7 +73,12 @@ func (s *svc) handleDelete(ctx context.Context, w http.ResponseWriter, r *http.R
return http.StatusBadRequest, errtypes.BadRequest("invalid if header")
}

res, err := s.gwClient.Delete(ctx, req)
client, err := s.gatewaySelector.Next()
if err != nil {
return http.StatusInternalServerError, errtypes.InternalError(err.Error())
}

res, err := client.Delete(ctx, req)
switch {
case err != nil:
span.RecordError(err)
Expand All @@ -92,7 +97,7 @@ func (s *svc) handleDelete(ctx context.Context, w http.ResponseWriter, r *http.R
status = http.StatusLocked
}
// check if user has access to resource
sRes, err := s.gwClient.Stat(ctx, &provider.StatRequest{Ref: ref})
sRes, err := client.Stat(ctx, &provider.StatRequest{Ref: ref})
if err != nil {
span.RecordError(err)
return http.StatusInternalServerError, err
Expand Down
12 changes: 9 additions & 3 deletions internal/http/services/owncloud/ocdav/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *svc) handlePathGet(w http.ResponseWriter, r *http.Request, ns string) {

sublog := appctx.GetLogger(ctx).With().Str("path", fn).Str("svc", "ocdav").Str("handler", "get").Logger()

space, status, err := spacelookup.LookUpStorageSpaceForPath(ctx, s.gwClient, fn)
space, status, err := spacelookup.LookUpStorageSpaceForPath(ctx, s.gatewaySelector, fn)
if err != nil {
sublog.Error().Err(err).Str("path", fn).Msg("failed to look up storage space")
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -60,10 +60,16 @@ func (s *svc) handlePathGet(w http.ResponseWriter, r *http.Request, ns string) {
}

func (s *svc) handleGet(ctx context.Context, w http.ResponseWriter, r *http.Request, ref *provider.Reference, dlProtocol string, log zerolog.Logger) {
client, err := s.gatewaySelector.Next()
if err != nil {
log.Error().Err(err).Msg("error selecting next client")
w.WriteHeader(http.StatusInternalServerError)
return
}
sReq := &provider.StatRequest{
Ref: ref,
}
sRes, err := s.gwClient.Stat(ctx, sReq)
sRes, err := client.Stat(ctx, sReq)
if err != nil {
log.Error().Err(err).Msg("error stat resource")
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -85,7 +91,7 @@ func (s *svc) handleGet(ctx context.Context, w http.ResponseWriter, r *http.Requ
}

dReq := &provider.InitiateFileDownloadRequest{Ref: ref}
dRes, err := s.gwClient.InitiateFileDownload(ctx, dReq)
dRes, err := client.InitiateFileDownload(ctx, dReq)
switch {
case err != nil:
log.Error().Err(err).Msg("error initiating file download")
Expand Down
Loading

0 comments on commit 458c6e4

Please sign in to comment.