diff --git a/beater/grpcauth.go b/beater/grpcauth.go deleted file mode 100644 index cd29bbfd6c4..00000000000 --- a/beater/grpcauth.go +++ /dev/null @@ -1,78 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package beater - -import ( - "context" - "strings" - - "github.com/elastic/apm-server/beater/authorization" - "github.com/elastic/apm-server/beater/headers" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" -) - -// newAuthUnaryServerInterceptor returns a grpc.UnaryServerInterceptor which -// performs per-RPC auth using "Authorization" metadata for OpenTelemetry methods. -// -// TODO(axw) when we get rid of the standalone Jaeger port move Jaeger auth -// handling to here, possibly by dispatching to a callback based on the method -// name and req. -func newAuthUnaryServerInterceptor(builder *authorization.Builder) grpc.UnaryServerInterceptor { - authHandler := builder.ForPrivilege(authorization.PrivilegeEventWrite.Action) - return func( - ctx context.Context, - req interface{}, - info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler, - ) (resp interface{}, err error) { - if strings.HasPrefix(info.FullMethod, "/opentelemetry") { - auth, err := verifyGRPCAuthorization(ctx, authHandler) - if err != nil { - return nil, err - } - ctx = authorization.ContextWithAuthorization(ctx, auth) - } - return handler(ctx, req) - } -} - -func verifyGRPCAuthorization(ctx context.Context, authHandler *authorization.Handler) (authorization.Authorization, error) { - var authHeader string - if md, ok := metadata.FromIncomingContext(ctx); ok { - if values := md.Get(headers.Authorization); len(values) > 0 { - authHeader = values[0] - } - } - auth := authHandler.AuthorizationFor(authorization.ParseAuthorizationHeader(authHeader)) - result, err := auth.AuthorizedFor(ctx, authorization.Resource{}) - if err != nil { - return nil, err - } - if !result.Authorized { - message := "unauthorized" - if result.Reason != "" { - message = result.Reason - } - return nil, status.Error(codes.Unauthenticated, message) - } - return auth, nil -} diff --git a/beater/interceptors/authorization.go b/beater/interceptors/authorization.go new file mode 100644 index 00000000000..5076381c806 --- /dev/null +++ b/beater/interceptors/authorization.go @@ -0,0 +1,89 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package interceptors + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "github.com/elastic/apm-server/beater/authorization" + "github.com/elastic/apm-server/beater/headers" +) + +// MethodAuthorizationHandler is a function type for obtaining an Authorization +// for a gRPC method call. This is used to authorize gRPC method calls by extracting +// authentication tokens from incoming context metadata or from the request payload. +type MethodAuthorizationHandler func(ctx context.Context, req interface{}) authorization.Authorization + +// Authorization returns a grpc.UnaryServerInterceptor that ensures method +// calls are authorized before passing on to the next handler. +// +// Authorization is performed using a MethodAuthorizationHandler from the +// combined map parameters, keyed on the full gRPC method name (info.FullMethod). +// If there is no handler defined for the method, authorization fails. +func Authorization(methodHandlers ...map[string]MethodAuthorizationHandler) grpc.UnaryServerInterceptor { + combinedMethodHandlers := make(map[string]MethodAuthorizationHandler) + for _, methodHandlers := range methodHandlers { + for method, handler := range methodHandlers { + combinedMethodHandlers[method] = handler + } + } + return func( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (interface{}, error) { + authHandler, ok := combinedMethodHandlers[info.FullMethod] + if !ok { + return nil, status.Errorf(codes.Unauthenticated, "no auth method defined for %q", info.FullMethod) + } + auth := authHandler(ctx, req) + authResult, err := auth.AuthorizedFor(ctx, authorization.Resource{}) + if err != nil { + return nil, err + } else if !authResult.Authorized { + message := "unauthorized" + if authResult.Reason != "" { + message = authResult.Reason + } + return nil, status.Error(codes.Unauthenticated, message) + } + ctx = authorization.ContextWithAuthorization(ctx, auth) + return handler(ctx, req) + } +} + +// MetadataMethodAuthorizationHandler returns a MethodAuthorizationHandler +// that extracts authentication parameters from the "authorization" metadata in ctx, +// calling authHandler.AuthorizedFor. +func MetadataMethodAuthorizationHandler(authHandler *authorization.Handler) MethodAuthorizationHandler { + return func(ctx context.Context, req interface{}) authorization.Authorization { + var authHeader string + if md, ok := metadata.FromIncomingContext(ctx); ok { + if values := md.Get(headers.Authorization); len(values) > 0 { + authHeader = values[0] + } + } + return authHandler.AuthorizationFor(authorization.ParseAuthorizationHeader(authHeader)) + } +} diff --git a/beater/interceptors/authorization_test.go b/beater/interceptors/authorization_test.go new file mode 100644 index 00000000000..d923b2daf54 --- /dev/null +++ b/beater/interceptors/authorization_test.go @@ -0,0 +1,141 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package interceptors_test + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "github.com/elastic/apm-server/beater/authorization" + "github.com/elastic/apm-server/beater/config" + "github.com/elastic/apm-server/beater/interceptors" +) + +func TestAuthorization(t *testing.T) { + type contextKey struct{} + origContext := context.WithValue(context.Background(), contextKey{}, 123) + origReq := "grpc_request" + origResp := "grpc_response" + origErr := errors.New("handler error") + + authorizedResult := authorization.Result{Authorized: true} + anonymousResult := authorization.Result{Authorized: true, Anonymous: true} + unauthorizedResult := authorization.Result{Authorized: false, Reason: "no particular reason"} + + authorized := authorizationFunc(func(ctx context.Context, _ authorization.Resource) (authorization.Result, error) { + assert.Equal(t, 123, ctx.Value(contextKey{})) + return authorizedResult, nil + }) + anonymous := authorizationFunc(func(ctx context.Context, _ authorization.Resource) (authorization.Result, error) { + assert.Equal(t, 123, ctx.Value(contextKey{})) + return anonymousResult, nil + }) + unauthorized := authorizationFunc(func(ctx context.Context, _ authorization.Resource) (authorization.Result, error) { + assert.Equal(t, 123, ctx.Value(contextKey{})) + return unauthorizedResult, nil + }) + authError := authorizationFunc(func(ctx context.Context, _ authorization.Resource) (authorization.Result, error) { + assert.Equal(t, 123, ctx.Value(contextKey{})) + return authorization.Result{}, errors.New("error checking authorization") + }) + + makeMethodAuthorizationHandler := func(auth authorization.Authorization) interceptors.MethodAuthorizationHandler { + return func(ctx context.Context, req interface{}) authorization.Authorization { + require.Equal(t, origReq, req) + return auth + } + } + + interceptor := interceptors.Authorization( + map[string]interceptors.MethodAuthorizationHandler{ + "authorized": makeMethodAuthorizationHandler(authorized), + "anonymous": makeMethodAuthorizationHandler(anonymous), + }, + map[string]interceptors.MethodAuthorizationHandler{ + "unauthorized": makeMethodAuthorizationHandler(unauthorized), + "authError": makeMethodAuthorizationHandler(authError), + }, + ) + + type test struct { + method string + expectResult authorization.Result + expectResp interface{} + expectErr error + } + for _, test := range []test{{ + method: "authorized", + expectResult: authorizedResult, + expectResp: origResp, + expectErr: origErr, + }, { + method: "anonymous", + expectResult: anonymousResult, + expectResp: origResp, + expectErr: origErr, + }, { + method: "unauthorized", + expectResp: nil, + expectErr: status.Error(codes.Unauthenticated, "no particular reason"), + }, { + method: "authError", + expectResp: nil, + expectErr: errors.New("error checking authorization"), + }} { + t.Run(test.method, func(t *testing.T) { + var authorizedForResult authorization.Result + var authorizedForErr error + next := func(ctx context.Context, req interface{}) (interface{}, error) { + authorizedForResult, authorizedForErr = authorization.AuthorizedFor(ctx, authorization.Resource{}) + return origResp, origErr + } + resp, err := interceptor(origContext, origReq, &grpc.UnaryServerInfo{FullMethod: test.method}, next) + assert.Equal(t, test.expectErr, err) + assert.Equal(t, test.expectResp, resp) + assert.Equal(t, test.expectResult, authorizedForResult) + assert.NoError(t, authorizedForErr) + }) + } +} + +func TestMetadataMethodAuthorizationHandler(t *testing.T) { + authBuilder, _ := authorization.NewBuilder(config.AgentAuth{SecretToken: "abc123"}) + authHandler := authBuilder.ForPrivilege(authorization.PrivilegeEventWrite.Action) + methodHandler := interceptors.MetadataMethodAuthorizationHandler(authHandler) + + ctx := context.Background() + ctx = metadata.NewIncomingContext(ctx, metadata.Pairs("authorization", "Bearer abc123")) + auth := methodHandler(ctx, nil) + result, err := auth.AuthorizedFor(ctx, authorization.Resource{}) + require.NoError(t, err) + assert.Equal(t, authorization.Result{Authorized: true}, result) +} + +type authorizationFunc func(context.Context, authorization.Resource) (authorization.Result, error) + +func (f authorizationFunc) AuthorizedFor(ctx context.Context, resource authorization.Resource) (authorization.Result, error) { + return f(ctx, resource) +} diff --git a/beater/jaeger/common.go b/beater/jaeger/common.go index 69d80df18c3..f385ac42c0a 100644 --- a/beater/jaeger/common.go +++ b/beater/jaeger/common.go @@ -21,18 +21,14 @@ import ( "context" "github.com/jaegertracing/jaeger/model" - "github.com/pkg/errors" "go.opentelemetry.io/collector/consumer" trjaeger "go.opentelemetry.io/collector/translator/trace/jaeger" "github.com/elastic/beats/v7/libbeat/monitoring" - "github.com/elastic/apm-server/beater/authorization" "github.com/elastic/apm-server/beater/request" ) -var errNotAuthorized = errors.New("not authorized") - type monitoringMap map[request.ResultID]*monitoring.Int func (m monitoringMap) inc(id request.ResultID) { @@ -58,36 +54,3 @@ func consumeBatch( traces := trjaeger.ProtoBatchToInternalTraces(batch) return consumer.ConsumeTraces(ctx, traces) } - -type authFunc func(context.Context, model.Batch) (context.Context, error) - -func noAuth(ctx context.Context, _ model.Batch) (context.Context, error) { - return ctx, nil -} - -func makeAuthFunc(authTag string, authHandler *authorization.Handler) authFunc { - return func(ctx context.Context, batch model.Batch) (context.Context, error) { - var kind, token string - for i, kv := range batch.Process.GetTags() { - if kv.Key != authTag { - continue - } - // Remove the auth tag. - batch.Process.Tags = append(batch.Process.Tags[:i], batch.Process.Tags[i+1:]...) - kind, token = authorization.ParseAuthorizationHeader(kv.VStr) - break - } - auth := authHandler.AuthorizationFor(kind, token) - result, err := auth.AuthorizedFor(ctx, authorization.Resource{}) - if !result.Authorized { - if err != nil { - return nil, errors.Wrap(err, errNotAuthorized.Error()) - } - // NOTE(axw) for now at least, we do not return result.Reason in the error message, - // as it refers to the "Authorization header" which is incorrect for Jaeger. - return nil, errNotAuthorized - } - ctx = authorization.ContextWithAuthorization(ctx, auth) - return ctx, nil - } -} diff --git a/beater/jaeger/grpc.go b/beater/jaeger/grpc.go index 4738fa0adbc..3a91eb9493e 100644 --- a/beater/jaeger/grpc.go +++ b/beater/jaeger/grpc.go @@ -27,13 +27,13 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "go.opentelemetry.io/collector/consumer" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/apm-server/agentcfg" + "github.com/elastic/apm-server/beater/authorization" + "github.com/elastic/apm-server/beater/interceptors" "github.com/elastic/apm-server/beater/request" "github.com/elastic/apm-server/processor/otel" ) @@ -60,9 +60,16 @@ const ( getSamplingStrategyFullMethod = "/jaeger.api_v2.SamplingManager/GetSamplingStrategy" ) +// MethodAuthorizationHandlers returns a map of all supported Jaeger/gRPC methods to authorization handlers. +func MethodAuthorizationHandlers(authBuilder *authorization.Builder, authTag string) map[string]interceptors.MethodAuthorizationHandler { + return map[string]interceptors.MethodAuthorizationHandler{ + postSpansFullMethod: postSpansMethodAuthorizationHandler(authBuilder, authTag), + getSamplingStrategyFullMethod: getSamplingStrategyMethodAuthorizationHandler(authBuilder), + } +} + // grpcCollector implements Jaeger api_v2 protocol for receiving tracing data type grpcCollector struct { - auth authFunc consumer consumer.Traces } @@ -78,11 +85,6 @@ func (c *grpcCollector) PostSpans(ctx context.Context, r *api_v2.PostSpansReques } func (c *grpcCollector) postSpans(ctx context.Context, batch model.Batch) error { - ctx, err := c.auth(ctx, batch) - if err != nil { - gRPCCollectorMonitoringMap.inc(request.IDResponseErrorsUnauthorized) - return status.Error(codes.Unauthenticated, err.Error()) - } return consumeBatch(ctx, batch, c.consumer, gRPCCollectorMonitoringMap) } @@ -128,8 +130,28 @@ func (s *grpcSampler) GetSamplingStrategy( } func (s *grpcSampler) fetchSamplingRate(ctx context.Context, service string) (float64, error) { - query := agentcfg.Query{Service: agentcfg.Service{Name: service}, - InsecureAgents: jaegerAgentPrefixes, MarkAsAppliedByAgent: newBool(true)} + // Only service, and not agent, is known for config queries. + // For anonymous/untrusted agents, we filter the results using + // query.InsecureAgents below. + authResource := authorization.Resource{ServiceName: service} + authResult, err := authorization.AuthorizedFor(ctx, authResource) + if err != nil { + return 0, err + } + if !authResult.Authorized { + err := authorization.ErrUnauthorized + if authResult.Reason != "" { + err = fmt.Errorf("%w: %s", err, authResult.Reason) + } + return 0, err + } + + markAsAppliedByAgent := true + query := agentcfg.Query{ + Service: agentcfg.Service{Name: service}, + InsecureAgents: jaegerAgentPrefixes, + MarkAsAppliedByAgent: &markAsAppliedByAgent, + } result, err := s.fetcher.Fetch(ctx, query) if err != nil { gRPCSamplingMonitoringMap.inc(request.IDResponseErrorsServiceUnavailable) @@ -169,4 +191,28 @@ func checkValidationError(err *agentcfg.ValidationError) error { } } -func newBool(b bool) *bool { return &b } +func postSpansMethodAuthorizationHandler(authBuilder *authorization.Builder, authTag string) interceptors.MethodAuthorizationHandler { + authHandler := authBuilder.ForPrivilege(authorization.PrivilegeEventWrite.Action) + return func(ctx context.Context, req interface{}) authorization.Authorization { + postSpansRequest := req.(*api_v2.PostSpansRequest) + batch := &postSpansRequest.Batch + var kind, token string + for i, kv := range batch.Process.GetTags() { + if kv.Key != authTag { + continue + } + // Remove the auth tag. + batch.Process.Tags = append(batch.Process.Tags[:i], batch.Process.Tags[i+1:]...) + kind, token = authorization.ParseAuthorizationHeader(kv.VStr) + break + } + return authHandler.AuthorizationFor(kind, token) + } +} + +func getSamplingStrategyMethodAuthorizationHandler(authBuilder *authorization.Builder) interceptors.MethodAuthorizationHandler { + authHandler := authBuilder.ForPrivilege(authorization.PrivilegeAgentConfigRead.Action) + return func(ctx context.Context, req interface{}) authorization.Authorization { + return authHandler.AuthorizationFor("", "") + } +} diff --git a/beater/jaeger/grpc_test.go b/beater/jaeger/grpc_test.go index d2de61bf7fd..01f191e1017 100644 --- a/beater/jaeger/grpc_test.go +++ b/beater/jaeger/grpc_test.go @@ -24,26 +24,23 @@ import ( "testing" "time" - "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/translator/trace/jaeger" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/apm-server/agentcfg" + "github.com/elastic/apm-server/beater/authorization" "github.com/elastic/apm-server/beater/beatertest" + "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/kibana/kibanatest" ) -type authKey struct{} - func TestGRPCCollector_PostSpans(t *testing.T) { for name, tc := range map[string]testGRPCCollector{ "empty request": { @@ -54,21 +51,6 @@ func TestGRPCCollector_PostSpans(t *testing.T) { consumerErr: errors.New("consumer failed"), expectedErr: errors.New("consumer failed"), }, - "auth fails": { - authErr: errors.New("oh noes"), - expectedErr: status.Error(codes.Unauthenticated, "oh noes"), - }, - "auth context": { - auth: func(ctx context.Context, batch model.Batch) (context.Context, error) { - return context.WithValue(ctx, authKey{}, 123), nil - }, - consumer: func(ctx context.Context, td pdata.Traces) error { - if ctx.Value(authKey{}) != 123 { - panic("auth context not propagated to consumer") - } - return nil - }, - }, } { t.Run(name, func(t *testing.T) { tc.setup(t) @@ -89,8 +71,6 @@ func TestGRPCCollector_PostSpans(t *testing.T) { type testGRPCCollector struct { request *api_v2.PostSpansRequest consumer tracesConsumerFunc - auth authFunc - authErr error consumerErr error collector *grpcCollector @@ -125,12 +105,7 @@ func (tc *testGRPCCollector) setup(t *testing.T) { return tc.consumerErr } } - if tc.auth == nil { - tc.auth = func(ctx context.Context, _ model.Batch) (context.Context, error) { - return ctx, tc.authErr - } - } - tc.collector = &grpcCollector{tc.auth, tc.consumer} + tc.collector = &grpcCollector{tc.consumer} } type tracesConsumerFunc func(ctx context.Context, td pdata.Traces) error @@ -178,7 +153,12 @@ func TestGRPCSampler_GetSamplingStrategy(t *testing.T) { require.NoError(t, logp.DevelopmentSetup(logp.ToObserverOutput())) tc.setup() params := &api_v2.SamplingStrategyParameters{ServiceName: "serviceA"} - resp, err := tc.sampler.GetSamplingStrategy(context.Background(), params) + + ctx := context.Background() + authBuilder, _ := authorization.NewBuilder(config.AgentAuth{}) + authHandler := authBuilder.ForPrivilege(authorization.PrivilegeAgentConfigRead.Action) + ctx = authorization.ContextWithAuthorization(ctx, authHandler.AuthorizationFor("", "")) + resp, err := tc.sampler.GetSamplingStrategy(ctx, params) // assert sampling response if tc.expectedErrMsg != "" { diff --git a/beater/jaeger/server.go b/beater/jaeger/server.go index 42ef75b226d..59ec606b1a9 100644 --- a/beater/jaeger/server.go +++ b/beater/jaeger/server.go @@ -79,50 +79,42 @@ func NewServer( srv := &Server{logger: logger} if cfg.JaegerConfig.GRPC.Enabled { - var authBuilder *authorization.Builder + var agentAuth config.AgentAuth if cfg.JaegerConfig.GRPC.AuthTag != "" { // By default auth is not required for Jaeger - users // must explicitly specify which tag to use. - // TODO(axw) share auth builder with beater/api. - var err error - authBuilder, err = authorization.NewBuilder(cfg.AgentAuth) - if err != nil { - return nil, err - } + agentAuth = cfg.AgentAuth } - - // TODO(axw) should the listener respect cfg.MaxConnections? - grpcListener, err := net.Listen("tcp", cfg.JaegerConfig.GRPC.Host) + authBuilder, err := authorization.NewBuilder(agentAuth) if err != nil { return nil, err } + logger = logger.Named(logs.Jaeger) - grpcOptions := []grpc.ServerOption{ - grpc.ChainUnaryInterceptor( - apmgrpc.NewUnaryServerInterceptor( - apmgrpc.WithRecovery(), - apmgrpc.WithTracer(tracer), - ), - interceptors.Logging(logger), - interceptors.Metrics(logger, RegistryMonitoringMaps), - interceptors.Timeout(), + grpcInterceptors := []grpc.UnaryServerInterceptor{ + apmgrpc.NewUnaryServerInterceptor( + apmgrpc.WithRecovery(), + apmgrpc.WithTracer(tracer), ), + interceptors.Logging(logger), + interceptors.Metrics(logger, RegistryMonitoringMaps), + interceptors.Timeout(), + interceptors.Authorization(MethodAuthorizationHandlers(authBuilder, cfg.JaegerConfig.GRPC.AuthTag)), } + + // TODO(axw) should the listener respect cfg.MaxConnections? + grpcListener, err := net.Listen("tcp", cfg.JaegerConfig.GRPC.Host) + if err != nil { + return nil, err + } + grpcOptions := []grpc.ServerOption{grpc.ChainUnaryInterceptor(grpcInterceptors...)} if cfg.JaegerConfig.GRPC.TLS != nil { creds := credentials.NewTLS(cfg.JaegerConfig.GRPC.TLS) grpcOptions = append(grpcOptions, grpc.Creds(creds)) } srv.grpc.server = grpc.NewServer(grpcOptions...) srv.grpc.listener = grpcListener - - RegisterGRPCServices( - srv.grpc.server, - authBuilder, - cfg.JaegerConfig.GRPC.AuthTag, - logger, - processor, - fetcher, - ) + RegisterGRPCServices(srv.grpc.server, logger, processor, fetcher) } if cfg.JaegerConfig.HTTP.Enabled { // TODO(axw) should the listener respect cfg.MaxConnections? @@ -149,18 +141,12 @@ func NewServer( // RegisterGRPCServices registers Jaeger gRPC services with srv. func RegisterGRPCServices( srv *grpc.Server, - authBuilder *authorization.Builder, - authTag string, logger *logp.Logger, processor model.BatchProcessor, fetcher agentcfg.Fetcher, ) { - auth := noAuth - if authTag != "" { - auth = makeAuthFunc(authTag, authBuilder.ForPrivilege(authorization.PrivilegeEventWrite.Action)) - } traceConsumer := &otel.Consumer{Processor: processor} - api_v2.RegisterCollectorServiceServer(srv, &grpcCollector{auth, traceConsumer}) + api_v2.RegisterCollectorServiceServer(srv, &grpcCollector{traceConsumer}) api_v2.RegisterSamplingManagerServer(srv, &grpcSampler{logger, fetcher}) } diff --git a/beater/otlp/grpc.go b/beater/otlp/grpc.go index 2ab6f31218e..74b574eeeaa 100644 --- a/beater/otlp/grpc.go +++ b/beater/otlp/grpc.go @@ -25,6 +25,8 @@ import ( "go.opentelemetry.io/collector/receiver/otlpreceiver" "google.golang.org/grpc" + "github.com/elastic/apm-server/beater/authorization" + "github.com/elastic/apm-server/beater/interceptors" "github.com/elastic/apm-server/beater/request" "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/processor/otel" @@ -58,6 +60,17 @@ func init() { monitoring.NewFunc(gRPCMetricsRegistry, "consumer", collectMetricsMonitoring, monitoring.Report) } +// MethodAuthorizationHandlers returns a map of all supported OTLP/gRPC methods to authorization handlers. +func MethodAuthorizationHandlers(authBuilder *authorization.Builder) map[string]interceptors.MethodAuthorizationHandler { + eventWriteMethodAuthorizationHandler := interceptors.MetadataMethodAuthorizationHandler( + authBuilder.ForPrivilege(authorization.PrivilegeEventWrite.Action), + ) + return map[string]interceptors.MethodAuthorizationHandler{ + metricsFullMethod: eventWriteMethodAuthorizationHandler, + tracesFullMethod: eventWriteMethodAuthorizationHandler, + } +} + // RegisterGRPCServices registers OTLP consumer services with the given gRPC server. func RegisterGRPCServices(grpcServer *grpc.Server, processor model.BatchProcessor) error { consumer := &otel.Consumer{Processor: processor} diff --git a/beater/server.go b/beater/server.go index 11179ebb31b..1d6ed797673 100644 --- a/beater/server.go +++ b/beater/server.go @@ -163,7 +163,10 @@ func newGRPCServer( } apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(tracer)) - authInterceptor := newAuthUnaryServerInterceptor(authBuilder) + authInterceptor := interceptors.Authorization( + otlp.MethodAuthorizationHandlers(authBuilder), + jaeger.MethodAuthorizationHandlers(authBuilder, jaeger.ElasticAuthTag), + ) // Note that we intentionally do not use a grpc.Creds ServerOption // even if TLS is enabled, as TLS is handled by the net/http server. @@ -196,7 +199,7 @@ func newGRPCServer( batchProcessor, } - jaeger.RegisterGRPCServices(srv, authBuilder, jaeger.ElasticAuthTag, logger, batchProcessor, agentcfgFetcher) + jaeger.RegisterGRPCServices(srv, logger, batchProcessor, agentcfgFetcher) if err := otlp.RegisterGRPCServices(srv, batchProcessor); err != nil { return nil, err } diff --git a/tests/system/test_jaeger.py b/tests/system/test_jaeger.py index 058d556fcc8..5c74d24f7d5 100644 --- a/tests/system/test_jaeger.py +++ b/tests/system/test_jaeger.py @@ -93,7 +93,7 @@ def test_jaeger_unauthorized(self): ) stdout, stderr = proc.communicate() self.assertNotEqual(proc.returncode, 0) - self.assertRegex(stderr.decode("utf-8"), "not authorized") + self.assertRegex(stderr.decode("utf-8"), "Unauthenticated") def test_jaeger_authorized(self): """