Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Traces: Use OpenTelemetry #1984

Merged
merged 15 commits into from
Aug 17, 2021
9 changes: 9 additions & 0 deletions changelog/unreleased/opentelemetry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Enhancement: Replace OpenCensus with OpenTelemetry

OpenTelemetry](https://opentelemetry.io/docs/concepts/what-is-opentelemetry/) is an [open standard](https://github.com/open-telemetry/opentelemetry-specification) a sandbox CNCF project and it was formed through a merger of the OpenTracing and OpenCensus.

> OpenCensus and OpenTracing have merged to form OpenTelemetry, which serves as the next major version of OpenCensus and OpenTracing. OpenTelemetry will offer backwards compatibility with existing OpenCensus integrations, and we will continue to make security patches to existing OpenCensus libraries for two years.

There is a lot of outdated documentation as a result of this merger, and we will be better off adopting the latest standard and libraries.

https://github.com/cs3org/reva/pull/1984
62 changes: 12 additions & 50 deletions cmd/revad/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,17 @@ import (
"strconv"
"strings"

"github.com/cs3org/reva/pkg/registry/memory"

"github.com/cs3org/reva/pkg/utils"

"contrib.go.opencensus.io/exporter/jaeger"
"github.com/cs3org/reva/cmd/revad/internal/grace"
"github.com/cs3org/reva/pkg/logger"
"github.com/cs3org/reva/pkg/registry/memory"
"github.com/cs3org/reva/pkg/rgrpc"
"github.com/cs3org/reva/pkg/rhttp"
"github.com/cs3org/reva/pkg/sharedconf"
rtrace "github.com/cs3org/reva/pkg/trace"
"github.com/cs3org/reva/pkg/utils"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/stats/view"
"go.opencensus.io/trace"
)

// Run runs a reva server with the given config file and pid file.
Expand Down Expand Up @@ -84,14 +78,18 @@ type coreConf struct {
TracingEndpoint string `mapstructure:"tracing_endpoint"`
TracingCollector string `mapstructure:"tracing_collector"`
TracingServiceName string `mapstructure:"tracing_service_name"`

// TracingService specifies the service. i.e OpenCensus, OpenTelemetry, OpenTracing...
TracingService string `mapstructure:"tracing_service"`
}

func run(mainConf map[string]interface{}, coreConf *coreConf, logger *zerolog.Logger, filename string) {
host, _ := os.Hostname()
logger.Info().Msgf("host info: %s", host)

// initRegistry()
initTracing(coreConf, logger)
if coreConf.TracingEnabled {
initTracing(coreConf)
}
initCPUCount(coreConf, logger)

servers := initServers(mainConf, logger)
Expand Down Expand Up @@ -144,18 +142,14 @@ func initServers(mainConf map[string]interface{}, log *zerolog.Logger) map[strin
}

if len(servers) == 0 {
// nothing to do
log.Info().Msg("nothing to do, no grpc/http enabled_services declared in config")
os.Exit(1)
}
return servers
}

func initTracing(conf *coreConf, log *zerolog.Logger) {
if err := setupOpenCensus(conf); err != nil {
log.Error().Err(err).Msg("error configuring open census stats and tracing")
os.Exit(1)
}
func initTracing(conf *coreConf) {
rtrace.SetTraceProvider(conf.TracingCollector)
}

func initCPUCount(conf *coreConf, log *zerolog.Logger) {
Expand Down Expand Up @@ -270,39 +264,6 @@ func getHTTPServer(conf interface{}, l *zerolog.Logger) (*rhttp.Server, error) {
return s, nil
}

func setupOpenCensus(conf *coreConf) error {
if err := view.Register(ochttp.DefaultServerViews...); err != nil {
return err
}

if err := view.Register(ocgrpc.DefaultServerViews...); err != nil {
return err
}

if !conf.TracingEnabled {
return nil
}

if conf.TracingServiceName == "" {
conf.TracingServiceName = "revad"
}

je, err := jaeger.NewExporter(jaeger.Options{
AgentEndpoint: conf.TracingEndpoint,
CollectorEndpoint: conf.TracingCollector,
ServiceName: conf.TracingServiceName,
})

if err != nil {
return err
}

// register it as a trace exporter
trace.RegisterExporter(je)
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
return nil
}

// adjustCPU parses string cpu and sets GOMAXPROCS
// according to its value. It accepts either
// a number (e.g. 3) or a percent (e.g. 50%).
Expand Down Expand Up @@ -349,6 +310,7 @@ func parseCoreConfOrDie(v interface{}) *coreConf {
fmt.Fprintf(os.Stderr, "error decoding core config: %s\n", err.Error())
os.Exit(1)
}

return c
}

Expand Down
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,15 @@ require (
github.com/tus/tusd v1.1.1-0.20200416115059-9deabf9d80c2
go.mongodb.org/mongo-driver v1.5.1 // indirect
go.opencensus.io v0.23.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.22.0
go.opentelemetry.io/otel v1.0.0-RC2
go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC2
go.opentelemetry.io/otel/metric v0.20.0 // indirect
go.opentelemetry.io/otel/sdk v1.0.0-RC2
go.opentelemetry.io/otel/trace v1.0.0-RC2
golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sys v0.0.0-20210423082822-04245dca01da
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1
google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98 // indirect
google.golang.org/grpc v1.39.1
Expand Down
19 changes: 19 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,23 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/contrib v0.22.0 h1:0F7gDEjgb1WGn4ODIjaCAg75hmqF+UN0LiVgwxsCodc=
go.opentelemetry.io/contrib v0.22.0/go.mod h1:EH4yDYeNoaTqn/8yCWQmfNB78VHfGX2Jt2bvnvzBlGM=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.22.0 h1:TjqELdtCtlOJQrTnXd2y+RP6wXKZUnnJer0HR0CSo18=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.22.0/go.mod h1:KjqwX4uJNaj479ZjFpADOMJKOM4rBXq4kN7nbeuGKrY=
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
go.opentelemetry.io/otel v1.0.0-RC2 h1:SHhxSjB+omnGZPgGlKe+QMp3MyazcOHdQ8qwo89oKbg=
go.opentelemetry.io/otel v1.0.0-RC2/go.mod h1:w1thVQ7qbAy8MHb0IFj8a5Q2QU0l2ksf8u/CN8m3NOM=
go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC2 h1:RF0nWsIDpDBe+s06lkLxUw9CWQUAhO6hBSxxB7dz45s=
go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC2/go.mod h1:sZZqN3Vb0iT+NE6mZ1S7sNyH3t4PFk6ElK5TLGFBZ7E=
go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU=
go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw=
go.opentelemetry.io/otel/oteltest v1.0.0-RC2/go.mod h1:kiQ4tw5tAL4JLTbcOYwK1CWI1HkT5aiLzHovgOVnz/A=
go.opentelemetry.io/otel/sdk v1.0.0-RC2 h1:ROuteeSCBaZNjiT9JcFzZepmInDvLktR28Y6qKo8bCs=
go.opentelemetry.io/otel/sdk v1.0.0-RC2/go.mod h1:fgwHyiDn4e5k40TD9VX243rOxXR+jzsWBZYA2P5jpEw=
go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
go.opentelemetry.io/otel/trace v1.0.0-RC2 h1:dunAP0qDULMIT82atj34m5RgvsIK6LcsXf1c/MsYg1w=
go.opentelemetry.io/otel/trace v1.0.0-RC2/go.mod h1:JPQ+z6nNw9mqEGT8o3eoPTdnNI+Aj5JcxEsVGREIAy4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down Expand Up @@ -754,6 +771,8 @@ golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
9 changes: 2 additions & 7 deletions internal/grpc/interceptors/appctx/appctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@ import (

"github.com/cs3org/reva/pkg/appctx"
"github.com/rs/zerolog"
"go.opencensus.io/trace"
"google.golang.org/grpc"
)

// NewUnary returns a new unary interceptor that creates the application context.
func NewUnary(log zerolog.Logger) grpc.UnaryServerInterceptor {
interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
span := trace.FromContext(ctx)
sub := log.With().Str("traceid", span.SpanContext().TraceID.String()).Logger()
ctx = appctx.WithLogger(ctx, &sub)
ctx = appctx.WithLogger(ctx, &log)
res, err := handler(ctx, req)
return res, err
}
Expand All @@ -43,9 +40,7 @@ func NewUnary(log zerolog.Logger) grpc.UnaryServerInterceptor {
// that creates the application context.
func NewStream(log zerolog.Logger) grpc.StreamServerInterceptor {
interceptor := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
span := trace.FromContext(ss.Context())
sub := log.With().Str("traceid", span.SpanContext().TraceID.String()).Logger()
ctx := appctx.WithLogger(ss.Context(), &sub)
ctx := appctx.WithLogger(ss.Context(), &log)
wrapped := newWrappedServerStream(ctx, ss)
err := handler(srv, wrapped)
return err
Expand Down
14 changes: 0 additions & 14 deletions internal/grpc/interceptors/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/cs3org/reva/pkg/utils"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"go.opencensus.io/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -80,12 +79,9 @@ func NewUnary(m map[string]interface{}, unprotected []string) (grpc.UnaryServerI
}

interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ctx, span := trace.StartSpan(ctx, "auth")
defer span.End()
log := appctx.GetLogger(ctx)

if utils.Skip(info.FullMethod, unprotected) {
span.AddAttributes(trace.BoolAttribute("auth_enabled", false))
log.Debug().Str("method", info.FullMethod).Msg("skipping auth")

// If a token is present, set it anyway, as we might need the user info
Expand All @@ -100,8 +96,6 @@ func NewUnary(m map[string]interface{}, unprotected []string) (grpc.UnaryServerI
return handler(ctx, req)
}

span.AddAttributes(trace.BoolAttribute("auth_enabled", true))

tkn, ok := ctxpkg.ContextGetToken(ctx)

if !ok || tkn == "" {
Expand All @@ -116,14 +110,6 @@ func NewUnary(m map[string]interface{}, unprotected []string) (grpc.UnaryServerI
return nil, status.Errorf(codes.PermissionDenied, "auth: core access token is invalid")
}

// store user and core access token in context.
span.AddAttributes(
trace.StringAttribute("id.idp", u.Id.Idp),
trace.StringAttribute("id.opaque_id", u.Id.OpaqueId),
trace.StringAttribute("username", u.Username),
trace.StringAttribute("token", tkn))
span.AddAttributes(trace.StringAttribute("user", u.String()), trace.StringAttribute("token", tkn))

ctx = ctxpkg.ContextSetUser(ctx, u)
return handler(ctx, req)
}
Expand Down
8 changes: 6 additions & 2 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"sync"
"time"

rtrace "github.com/cs3org/reva/pkg/trace"

collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"

Expand Down Expand Up @@ -849,15 +851,17 @@ func (s *svc) Delete(ctx context.Context, req *provider.DeleteRequest) (*provide
}, nil
}

ctx, span := rtrace.Provider.Tracer("reva").Start(ctx, "Delete")
defer span.End()

if !s.inSharedFolder(ctx, p) {
return s.delete(ctx, req)
}

if s.isSharedFolder(ctx, p) {
// TODO(labkode): deleting share names should be allowed, means unmounting.
log.Debug().Msgf("path:%s points to shared folder or share name", p)
err := errtypes.BadRequest("gateway: cannot delete share folder or share name: path=" + p)
log.Err(err).Msg("gateway: error creating container")
span.RecordError(err)
return &provider.DeleteResponse{
Status: status.NewInvalidArg(ctx, "path points to share folder or share name"),
}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ import (
"github.com/cs3org/reva/pkg/rgrpc"
"github.com/cs3org/reva/pkg/rgrpc/status"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
rtrace "github.com/cs3org/reva/pkg/trace"
"github.com/cs3org/reva/pkg/utils"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"go.opencensus.io/trace"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/codes"
gstatus "google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -305,12 +306,13 @@ func (s *service) DeleteStorageSpace(ctx context.Context, req *provider.DeleteSt
}

func (s *service) CreateContainer(ctx context.Context, req *provider.CreateContainerRequest) (*provider.CreateContainerResponse, error) {
ctx, span := trace.StartSpan(ctx, "CreateContainer")
ctx, span := rtrace.Provider.Tracer("publicstorageprovider").Start(ctx, "CreateContainer")
defer span.End()

span.AddAttributes(
trace.StringAttribute("ref", req.Ref.String()),
)
span.SetAttributes(attribute.KeyValue{
Key: "reference",
Value: attribute.StringValue(req.Ref.String()),
})

cs3Ref, _, ls, st, err := s.translatePublicRefToCS3Ref(ctx, req.Ref)
switch {
Expand Down Expand Up @@ -344,12 +346,13 @@ func (s *service) CreateContainer(ctx context.Context, req *provider.CreateConta
}

func (s *service) Delete(ctx context.Context, req *provider.DeleteRequest) (*provider.DeleteResponse, error) {
ctx, span := trace.StartSpan(ctx, "Delete")
ctx, span := rtrace.Provider.Tracer("publicstorageprovider").Start(ctx, "Delete")
defer span.End()

span.AddAttributes(
trace.StringAttribute("ref", req.Ref.String()),
)
span.SetAttributes(attribute.KeyValue{
Key: "reference",
Value: attribute.StringValue(req.Ref.String()),
})

cs3Ref, _, ls, st, err := s.translatePublicRefToCS3Ref(ctx, req.Ref)
switch {
Expand Down Expand Up @@ -383,12 +386,18 @@ func (s *service) Delete(ctx context.Context, req *provider.DeleteRequest) (*pro
}

func (s *service) Move(ctx context.Context, req *provider.MoveRequest) (*provider.MoveResponse, error) {
ctx, span := trace.StartSpan(ctx, "Move")
ctx, span := rtrace.Provider.Tracer("publicstorageprovider").Start(ctx, "Move")
defer span.End()

span.AddAttributes(
trace.StringAttribute("source", req.Source.String()),
trace.StringAttribute("destination", req.Destination.String()),
span.SetAttributes(
attribute.KeyValue{
Key: "source",
Value: attribute.StringValue(req.Source.String()),
},
attribute.KeyValue{
Key: "destination",
Value: attribute.StringValue(req.Destination.String()),
},
)

cs3RefSource, tknSource, ls, st, err := s.translatePublicRefToCS3Ref(ctx, req.Source)
Expand Down Expand Up @@ -440,12 +449,14 @@ func (s *service) Move(ctx context.Context, req *provider.MoveRequest) (*provide
}

func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provider.StatResponse, error) {
ctx, span := trace.StartSpan(ctx, "Stat")
ctx, span := rtrace.Provider.Tracer("publicstorageprovider").Start(ctx, "Stat")
defer span.End()

span.AddAttributes(
trace.StringAttribute("ref", req.Ref.String()),
)
span.SetAttributes(
attribute.KeyValue{
Key: "source",
Value: attribute.StringValue(req.Ref.String()),
})

tkn, relativePath, err := s.unwrap(ctx, req.Ref)
if err != nil {
Expand Down
Loading