From 92335c5b479390569f7c5464edadef052f0a436d Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 23 Jun 2021 09:18:24 +0800 Subject: [PATCH 1/3] Introduce shared gRPC authorization interceptor (#5515) * Introduce shared gRPC authorization interceptor Introduce a new gRPC authorization interceptor which is used for both OTLP and Jaeger. This new interceptor dispatches to method-specific handlers. All OTLP methods use the "authorization" metadata, whereas Jaeger has different approaches depending on the method: extract process tags for PostSpans, and anonymous auth only for GetSamplingStrategy. * beater/otlp: revert metrics change * tests/system: update Jaeger system test The response error message has changed, update the test to expect the new one. * beater/jaeger: fix logger name --- beater/grpcauth.go | 78 ------------ beater/interceptors/authorization.go | 89 ++++++++++++++ beater/interceptors/authorization_test.go | 141 ++++++++++++++++++++++ beater/jaeger/common.go | 37 ------ beater/jaeger/grpc.go | 68 +++++++++-- beater/jaeger/grpc_test.go | 38 ++---- beater/jaeger/server.go | 56 ++++----- beater/otlp/grpc.go | 13 ++ beater/server.go | 7 +- tests/system/test_jaeger.py | 2 +- 10 files changed, 336 insertions(+), 193 deletions(-) delete mode 100644 beater/grpcauth.go create mode 100644 beater/interceptors/authorization.go create mode 100644 beater/interceptors/authorization_test.go diff --git a/beater/grpcauth.go b/beater/grpcauth.go deleted file mode 100644 index cd29bbfd6c4..00000000000 --- a/beater/grpcauth.go +++ /dev/null @@ -1,78 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package beater - -import ( - "context" - "strings" - - "github.com/elastic/apm-server/beater/authorization" - "github.com/elastic/apm-server/beater/headers" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" -) - -// newAuthUnaryServerInterceptor returns a grpc.UnaryServerInterceptor which -// performs per-RPC auth using "Authorization" metadata for OpenTelemetry methods. -// -// TODO(axw) when we get rid of the standalone Jaeger port move Jaeger auth -// handling to here, possibly by dispatching to a callback based on the method -// name and req. -func newAuthUnaryServerInterceptor(builder *authorization.Builder) grpc.UnaryServerInterceptor { - authHandler := builder.ForPrivilege(authorization.PrivilegeEventWrite.Action) - return func( - ctx context.Context, - req interface{}, - info *grpc.UnaryServerInfo, - handler grpc.UnaryHandler, - ) (resp interface{}, err error) { - if strings.HasPrefix(info.FullMethod, "/opentelemetry") { - auth, err := verifyGRPCAuthorization(ctx, authHandler) - if err != nil { - return nil, err - } - ctx = authorization.ContextWithAuthorization(ctx, auth) - } - return handler(ctx, req) - } -} - -func verifyGRPCAuthorization(ctx context.Context, authHandler *authorization.Handler) (authorization.Authorization, error) { - var authHeader string - if md, ok := metadata.FromIncomingContext(ctx); ok { - if values := md.Get(headers.Authorization); len(values) > 0 { - authHeader = values[0] - } - } - auth := authHandler.AuthorizationFor(authorization.ParseAuthorizationHeader(authHeader)) - result, err := auth.AuthorizedFor(ctx, authorization.Resource{}) - if err != nil { - return nil, err - } - if !result.Authorized { - message := "unauthorized" - if result.Reason != "" { - message = result.Reason - } - return nil, status.Error(codes.Unauthenticated, message) - } - return auth, nil -} diff --git a/beater/interceptors/authorization.go b/beater/interceptors/authorization.go new file mode 100644 index 00000000000..5076381c806 --- /dev/null +++ b/beater/interceptors/authorization.go @@ -0,0 +1,89 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package interceptors + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "github.com/elastic/apm-server/beater/authorization" + "github.com/elastic/apm-server/beater/headers" +) + +// MethodAuthorizationHandler is a function type for obtaining an Authorization +// for a gRPC method call. This is used to authorize gRPC method calls by extracting +// authentication tokens from incoming context metadata or from the request payload. +type MethodAuthorizationHandler func(ctx context.Context, req interface{}) authorization.Authorization + +// Authorization returns a grpc.UnaryServerInterceptor that ensures method +// calls are authorized before passing on to the next handler. +// +// Authorization is performed using a MethodAuthorizationHandler from the +// combined map parameters, keyed on the full gRPC method name (info.FullMethod). +// If there is no handler defined for the method, authorization fails. +func Authorization(methodHandlers ...map[string]MethodAuthorizationHandler) grpc.UnaryServerInterceptor { + combinedMethodHandlers := make(map[string]MethodAuthorizationHandler) + for _, methodHandlers := range methodHandlers { + for method, handler := range methodHandlers { + combinedMethodHandlers[method] = handler + } + } + return func( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (interface{}, error) { + authHandler, ok := combinedMethodHandlers[info.FullMethod] + if !ok { + return nil, status.Errorf(codes.Unauthenticated, "no auth method defined for %q", info.FullMethod) + } + auth := authHandler(ctx, req) + authResult, err := auth.AuthorizedFor(ctx, authorization.Resource{}) + if err != nil { + return nil, err + } else if !authResult.Authorized { + message := "unauthorized" + if authResult.Reason != "" { + message = authResult.Reason + } + return nil, status.Error(codes.Unauthenticated, message) + } + ctx = authorization.ContextWithAuthorization(ctx, auth) + return handler(ctx, req) + } +} + +// MetadataMethodAuthorizationHandler returns a MethodAuthorizationHandler +// that extracts authentication parameters from the "authorization" metadata in ctx, +// calling authHandler.AuthorizedFor. +func MetadataMethodAuthorizationHandler(authHandler *authorization.Handler) MethodAuthorizationHandler { + return func(ctx context.Context, req interface{}) authorization.Authorization { + var authHeader string + if md, ok := metadata.FromIncomingContext(ctx); ok { + if values := md.Get(headers.Authorization); len(values) > 0 { + authHeader = values[0] + } + } + return authHandler.AuthorizationFor(authorization.ParseAuthorizationHeader(authHeader)) + } +} diff --git a/beater/interceptors/authorization_test.go b/beater/interceptors/authorization_test.go new file mode 100644 index 00000000000..d923b2daf54 --- /dev/null +++ b/beater/interceptors/authorization_test.go @@ -0,0 +1,141 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package interceptors_test + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "github.com/elastic/apm-server/beater/authorization" + "github.com/elastic/apm-server/beater/config" + "github.com/elastic/apm-server/beater/interceptors" +) + +func TestAuthorization(t *testing.T) { + type contextKey struct{} + origContext := context.WithValue(context.Background(), contextKey{}, 123) + origReq := "grpc_request" + origResp := "grpc_response" + origErr := errors.New("handler error") + + authorizedResult := authorization.Result{Authorized: true} + anonymousResult := authorization.Result{Authorized: true, Anonymous: true} + unauthorizedResult := authorization.Result{Authorized: false, Reason: "no particular reason"} + + authorized := authorizationFunc(func(ctx context.Context, _ authorization.Resource) (authorization.Result, error) { + assert.Equal(t, 123, ctx.Value(contextKey{})) + return authorizedResult, nil + }) + anonymous := authorizationFunc(func(ctx context.Context, _ authorization.Resource) (authorization.Result, error) { + assert.Equal(t, 123, ctx.Value(contextKey{})) + return anonymousResult, nil + }) + unauthorized := authorizationFunc(func(ctx context.Context, _ authorization.Resource) (authorization.Result, error) { + assert.Equal(t, 123, ctx.Value(contextKey{})) + return unauthorizedResult, nil + }) + authError := authorizationFunc(func(ctx context.Context, _ authorization.Resource) (authorization.Result, error) { + assert.Equal(t, 123, ctx.Value(contextKey{})) + return authorization.Result{}, errors.New("error checking authorization") + }) + + makeMethodAuthorizationHandler := func(auth authorization.Authorization) interceptors.MethodAuthorizationHandler { + return func(ctx context.Context, req interface{}) authorization.Authorization { + require.Equal(t, origReq, req) + return auth + } + } + + interceptor := interceptors.Authorization( + map[string]interceptors.MethodAuthorizationHandler{ + "authorized": makeMethodAuthorizationHandler(authorized), + "anonymous": makeMethodAuthorizationHandler(anonymous), + }, + map[string]interceptors.MethodAuthorizationHandler{ + "unauthorized": makeMethodAuthorizationHandler(unauthorized), + "authError": makeMethodAuthorizationHandler(authError), + }, + ) + + type test struct { + method string + expectResult authorization.Result + expectResp interface{} + expectErr error + } + for _, test := range []test{{ + method: "authorized", + expectResult: authorizedResult, + expectResp: origResp, + expectErr: origErr, + }, { + method: "anonymous", + expectResult: anonymousResult, + expectResp: origResp, + expectErr: origErr, + }, { + method: "unauthorized", + expectResp: nil, + expectErr: status.Error(codes.Unauthenticated, "no particular reason"), + }, { + method: "authError", + expectResp: nil, + expectErr: errors.New("error checking authorization"), + }} { + t.Run(test.method, func(t *testing.T) { + var authorizedForResult authorization.Result + var authorizedForErr error + next := func(ctx context.Context, req interface{}) (interface{}, error) { + authorizedForResult, authorizedForErr = authorization.AuthorizedFor(ctx, authorization.Resource{}) + return origResp, origErr + } + resp, err := interceptor(origContext, origReq, &grpc.UnaryServerInfo{FullMethod: test.method}, next) + assert.Equal(t, test.expectErr, err) + assert.Equal(t, test.expectResp, resp) + assert.Equal(t, test.expectResult, authorizedForResult) + assert.NoError(t, authorizedForErr) + }) + } +} + +func TestMetadataMethodAuthorizationHandler(t *testing.T) { + authBuilder, _ := authorization.NewBuilder(config.AgentAuth{SecretToken: "abc123"}) + authHandler := authBuilder.ForPrivilege(authorization.PrivilegeEventWrite.Action) + methodHandler := interceptors.MetadataMethodAuthorizationHandler(authHandler) + + ctx := context.Background() + ctx = metadata.NewIncomingContext(ctx, metadata.Pairs("authorization", "Bearer abc123")) + auth := methodHandler(ctx, nil) + result, err := auth.AuthorizedFor(ctx, authorization.Resource{}) + require.NoError(t, err) + assert.Equal(t, authorization.Result{Authorized: true}, result) +} + +type authorizationFunc func(context.Context, authorization.Resource) (authorization.Result, error) + +func (f authorizationFunc) AuthorizedFor(ctx context.Context, resource authorization.Resource) (authorization.Result, error) { + return f(ctx, resource) +} diff --git a/beater/jaeger/common.go b/beater/jaeger/common.go index 69d80df18c3..f385ac42c0a 100644 --- a/beater/jaeger/common.go +++ b/beater/jaeger/common.go @@ -21,18 +21,14 @@ import ( "context" "github.com/jaegertracing/jaeger/model" - "github.com/pkg/errors" "go.opentelemetry.io/collector/consumer" trjaeger "go.opentelemetry.io/collector/translator/trace/jaeger" "github.com/elastic/beats/v7/libbeat/monitoring" - "github.com/elastic/apm-server/beater/authorization" "github.com/elastic/apm-server/beater/request" ) -var errNotAuthorized = errors.New("not authorized") - type monitoringMap map[request.ResultID]*monitoring.Int func (m monitoringMap) inc(id request.ResultID) { @@ -58,36 +54,3 @@ func consumeBatch( traces := trjaeger.ProtoBatchToInternalTraces(batch) return consumer.ConsumeTraces(ctx, traces) } - -type authFunc func(context.Context, model.Batch) (context.Context, error) - -func noAuth(ctx context.Context, _ model.Batch) (context.Context, error) { - return ctx, nil -} - -func makeAuthFunc(authTag string, authHandler *authorization.Handler) authFunc { - return func(ctx context.Context, batch model.Batch) (context.Context, error) { - var kind, token string - for i, kv := range batch.Process.GetTags() { - if kv.Key != authTag { - continue - } - // Remove the auth tag. - batch.Process.Tags = append(batch.Process.Tags[:i], batch.Process.Tags[i+1:]...) - kind, token = authorization.ParseAuthorizationHeader(kv.VStr) - break - } - auth := authHandler.AuthorizationFor(kind, token) - result, err := auth.AuthorizedFor(ctx, authorization.Resource{}) - if !result.Authorized { - if err != nil { - return nil, errors.Wrap(err, errNotAuthorized.Error()) - } - // NOTE(axw) for now at least, we do not return result.Reason in the error message, - // as it refers to the "Authorization header" which is incorrect for Jaeger. - return nil, errNotAuthorized - } - ctx = authorization.ContextWithAuthorization(ctx, auth) - return ctx, nil - } -} diff --git a/beater/jaeger/grpc.go b/beater/jaeger/grpc.go index 4738fa0adbc..3a91eb9493e 100644 --- a/beater/jaeger/grpc.go +++ b/beater/jaeger/grpc.go @@ -27,13 +27,13 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "go.opentelemetry.io/collector/consumer" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/apm-server/agentcfg" + "github.com/elastic/apm-server/beater/authorization" + "github.com/elastic/apm-server/beater/interceptors" "github.com/elastic/apm-server/beater/request" "github.com/elastic/apm-server/processor/otel" ) @@ -60,9 +60,16 @@ const ( getSamplingStrategyFullMethod = "/jaeger.api_v2.SamplingManager/GetSamplingStrategy" ) +// MethodAuthorizationHandlers returns a map of all supported Jaeger/gRPC methods to authorization handlers. +func MethodAuthorizationHandlers(authBuilder *authorization.Builder, authTag string) map[string]interceptors.MethodAuthorizationHandler { + return map[string]interceptors.MethodAuthorizationHandler{ + postSpansFullMethod: postSpansMethodAuthorizationHandler(authBuilder, authTag), + getSamplingStrategyFullMethod: getSamplingStrategyMethodAuthorizationHandler(authBuilder), + } +} + // grpcCollector implements Jaeger api_v2 protocol for receiving tracing data type grpcCollector struct { - auth authFunc consumer consumer.Traces } @@ -78,11 +85,6 @@ func (c *grpcCollector) PostSpans(ctx context.Context, r *api_v2.PostSpansReques } func (c *grpcCollector) postSpans(ctx context.Context, batch model.Batch) error { - ctx, err := c.auth(ctx, batch) - if err != nil { - gRPCCollectorMonitoringMap.inc(request.IDResponseErrorsUnauthorized) - return status.Error(codes.Unauthenticated, err.Error()) - } return consumeBatch(ctx, batch, c.consumer, gRPCCollectorMonitoringMap) } @@ -128,8 +130,28 @@ func (s *grpcSampler) GetSamplingStrategy( } func (s *grpcSampler) fetchSamplingRate(ctx context.Context, service string) (float64, error) { - query := agentcfg.Query{Service: agentcfg.Service{Name: service}, - InsecureAgents: jaegerAgentPrefixes, MarkAsAppliedByAgent: newBool(true)} + // Only service, and not agent, is known for config queries. + // For anonymous/untrusted agents, we filter the results using + // query.InsecureAgents below. + authResource := authorization.Resource{ServiceName: service} + authResult, err := authorization.AuthorizedFor(ctx, authResource) + if err != nil { + return 0, err + } + if !authResult.Authorized { + err := authorization.ErrUnauthorized + if authResult.Reason != "" { + err = fmt.Errorf("%w: %s", err, authResult.Reason) + } + return 0, err + } + + markAsAppliedByAgent := true + query := agentcfg.Query{ + Service: agentcfg.Service{Name: service}, + InsecureAgents: jaegerAgentPrefixes, + MarkAsAppliedByAgent: &markAsAppliedByAgent, + } result, err := s.fetcher.Fetch(ctx, query) if err != nil { gRPCSamplingMonitoringMap.inc(request.IDResponseErrorsServiceUnavailable) @@ -169,4 +191,28 @@ func checkValidationError(err *agentcfg.ValidationError) error { } } -func newBool(b bool) *bool { return &b } +func postSpansMethodAuthorizationHandler(authBuilder *authorization.Builder, authTag string) interceptors.MethodAuthorizationHandler { + authHandler := authBuilder.ForPrivilege(authorization.PrivilegeEventWrite.Action) + return func(ctx context.Context, req interface{}) authorization.Authorization { + postSpansRequest := req.(*api_v2.PostSpansRequest) + batch := &postSpansRequest.Batch + var kind, token string + for i, kv := range batch.Process.GetTags() { + if kv.Key != authTag { + continue + } + // Remove the auth tag. + batch.Process.Tags = append(batch.Process.Tags[:i], batch.Process.Tags[i+1:]...) + kind, token = authorization.ParseAuthorizationHeader(kv.VStr) + break + } + return authHandler.AuthorizationFor(kind, token) + } +} + +func getSamplingStrategyMethodAuthorizationHandler(authBuilder *authorization.Builder) interceptors.MethodAuthorizationHandler { + authHandler := authBuilder.ForPrivilege(authorization.PrivilegeAgentConfigRead.Action) + return func(ctx context.Context, req interface{}) authorization.Authorization { + return authHandler.AuthorizationFor("", "") + } +} diff --git a/beater/jaeger/grpc_test.go b/beater/jaeger/grpc_test.go index d2de61bf7fd..01f191e1017 100644 --- a/beater/jaeger/grpc_test.go +++ b/beater/jaeger/grpc_test.go @@ -24,26 +24,23 @@ import ( "testing" "time" - "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/translator/trace/jaeger" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/apm-server/agentcfg" + "github.com/elastic/apm-server/beater/authorization" "github.com/elastic/apm-server/beater/beatertest" + "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/kibana/kibanatest" ) -type authKey struct{} - func TestGRPCCollector_PostSpans(t *testing.T) { for name, tc := range map[string]testGRPCCollector{ "empty request": { @@ -54,21 +51,6 @@ func TestGRPCCollector_PostSpans(t *testing.T) { consumerErr: errors.New("consumer failed"), expectedErr: errors.New("consumer failed"), }, - "auth fails": { - authErr: errors.New("oh noes"), - expectedErr: status.Error(codes.Unauthenticated, "oh noes"), - }, - "auth context": { - auth: func(ctx context.Context, batch model.Batch) (context.Context, error) { - return context.WithValue(ctx, authKey{}, 123), nil - }, - consumer: func(ctx context.Context, td pdata.Traces) error { - if ctx.Value(authKey{}) != 123 { - panic("auth context not propagated to consumer") - } - return nil - }, - }, } { t.Run(name, func(t *testing.T) { tc.setup(t) @@ -89,8 +71,6 @@ func TestGRPCCollector_PostSpans(t *testing.T) { type testGRPCCollector struct { request *api_v2.PostSpansRequest consumer tracesConsumerFunc - auth authFunc - authErr error consumerErr error collector *grpcCollector @@ -125,12 +105,7 @@ func (tc *testGRPCCollector) setup(t *testing.T) { return tc.consumerErr } } - if tc.auth == nil { - tc.auth = func(ctx context.Context, _ model.Batch) (context.Context, error) { - return ctx, tc.authErr - } - } - tc.collector = &grpcCollector{tc.auth, tc.consumer} + tc.collector = &grpcCollector{tc.consumer} } type tracesConsumerFunc func(ctx context.Context, td pdata.Traces) error @@ -178,7 +153,12 @@ func TestGRPCSampler_GetSamplingStrategy(t *testing.T) { require.NoError(t, logp.DevelopmentSetup(logp.ToObserverOutput())) tc.setup() params := &api_v2.SamplingStrategyParameters{ServiceName: "serviceA"} - resp, err := tc.sampler.GetSamplingStrategy(context.Background(), params) + + ctx := context.Background() + authBuilder, _ := authorization.NewBuilder(config.AgentAuth{}) + authHandler := authBuilder.ForPrivilege(authorization.PrivilegeAgentConfigRead.Action) + ctx = authorization.ContextWithAuthorization(ctx, authHandler.AuthorizationFor("", "")) + resp, err := tc.sampler.GetSamplingStrategy(ctx, params) // assert sampling response if tc.expectedErrMsg != "" { diff --git a/beater/jaeger/server.go b/beater/jaeger/server.go index 42ef75b226d..59ec606b1a9 100644 --- a/beater/jaeger/server.go +++ b/beater/jaeger/server.go @@ -79,50 +79,42 @@ func NewServer( srv := &Server{logger: logger} if cfg.JaegerConfig.GRPC.Enabled { - var authBuilder *authorization.Builder + var agentAuth config.AgentAuth if cfg.JaegerConfig.GRPC.AuthTag != "" { // By default auth is not required for Jaeger - users // must explicitly specify which tag to use. - // TODO(axw) share auth builder with beater/api. - var err error - authBuilder, err = authorization.NewBuilder(cfg.AgentAuth) - if err != nil { - return nil, err - } + agentAuth = cfg.AgentAuth } - - // TODO(axw) should the listener respect cfg.MaxConnections? - grpcListener, err := net.Listen("tcp", cfg.JaegerConfig.GRPC.Host) + authBuilder, err := authorization.NewBuilder(agentAuth) if err != nil { return nil, err } + logger = logger.Named(logs.Jaeger) - grpcOptions := []grpc.ServerOption{ - grpc.ChainUnaryInterceptor( - apmgrpc.NewUnaryServerInterceptor( - apmgrpc.WithRecovery(), - apmgrpc.WithTracer(tracer), - ), - interceptors.Logging(logger), - interceptors.Metrics(logger, RegistryMonitoringMaps), - interceptors.Timeout(), + grpcInterceptors := []grpc.UnaryServerInterceptor{ + apmgrpc.NewUnaryServerInterceptor( + apmgrpc.WithRecovery(), + apmgrpc.WithTracer(tracer), ), + interceptors.Logging(logger), + interceptors.Metrics(logger, RegistryMonitoringMaps), + interceptors.Timeout(), + interceptors.Authorization(MethodAuthorizationHandlers(authBuilder, cfg.JaegerConfig.GRPC.AuthTag)), } + + // TODO(axw) should the listener respect cfg.MaxConnections? + grpcListener, err := net.Listen("tcp", cfg.JaegerConfig.GRPC.Host) + if err != nil { + return nil, err + } + grpcOptions := []grpc.ServerOption{grpc.ChainUnaryInterceptor(grpcInterceptors...)} if cfg.JaegerConfig.GRPC.TLS != nil { creds := credentials.NewTLS(cfg.JaegerConfig.GRPC.TLS) grpcOptions = append(grpcOptions, grpc.Creds(creds)) } srv.grpc.server = grpc.NewServer(grpcOptions...) srv.grpc.listener = grpcListener - - RegisterGRPCServices( - srv.grpc.server, - authBuilder, - cfg.JaegerConfig.GRPC.AuthTag, - logger, - processor, - fetcher, - ) + RegisterGRPCServices(srv.grpc.server, logger, processor, fetcher) } if cfg.JaegerConfig.HTTP.Enabled { // TODO(axw) should the listener respect cfg.MaxConnections? @@ -149,18 +141,12 @@ func NewServer( // RegisterGRPCServices registers Jaeger gRPC services with srv. func RegisterGRPCServices( srv *grpc.Server, - authBuilder *authorization.Builder, - authTag string, logger *logp.Logger, processor model.BatchProcessor, fetcher agentcfg.Fetcher, ) { - auth := noAuth - if authTag != "" { - auth = makeAuthFunc(authTag, authBuilder.ForPrivilege(authorization.PrivilegeEventWrite.Action)) - } traceConsumer := &otel.Consumer{Processor: processor} - api_v2.RegisterCollectorServiceServer(srv, &grpcCollector{auth, traceConsumer}) + api_v2.RegisterCollectorServiceServer(srv, &grpcCollector{traceConsumer}) api_v2.RegisterSamplingManagerServer(srv, &grpcSampler{logger, fetcher}) } diff --git a/beater/otlp/grpc.go b/beater/otlp/grpc.go index 2ab6f31218e..74b574eeeaa 100644 --- a/beater/otlp/grpc.go +++ b/beater/otlp/grpc.go @@ -25,6 +25,8 @@ import ( "go.opentelemetry.io/collector/receiver/otlpreceiver" "google.golang.org/grpc" + "github.com/elastic/apm-server/beater/authorization" + "github.com/elastic/apm-server/beater/interceptors" "github.com/elastic/apm-server/beater/request" "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/processor/otel" @@ -58,6 +60,17 @@ func init() { monitoring.NewFunc(gRPCMetricsRegistry, "consumer", collectMetricsMonitoring, monitoring.Report) } +// MethodAuthorizationHandlers returns a map of all supported OTLP/gRPC methods to authorization handlers. +func MethodAuthorizationHandlers(authBuilder *authorization.Builder) map[string]interceptors.MethodAuthorizationHandler { + eventWriteMethodAuthorizationHandler := interceptors.MetadataMethodAuthorizationHandler( + authBuilder.ForPrivilege(authorization.PrivilegeEventWrite.Action), + ) + return map[string]interceptors.MethodAuthorizationHandler{ + metricsFullMethod: eventWriteMethodAuthorizationHandler, + tracesFullMethod: eventWriteMethodAuthorizationHandler, + } +} + // RegisterGRPCServices registers OTLP consumer services with the given gRPC server. func RegisterGRPCServices(grpcServer *grpc.Server, processor model.BatchProcessor) error { consumer := &otel.Consumer{Processor: processor} diff --git a/beater/server.go b/beater/server.go index 11179ebb31b..1d6ed797673 100644 --- a/beater/server.go +++ b/beater/server.go @@ -163,7 +163,10 @@ func newGRPCServer( } apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(tracer)) - authInterceptor := newAuthUnaryServerInterceptor(authBuilder) + authInterceptor := interceptors.Authorization( + otlp.MethodAuthorizationHandlers(authBuilder), + jaeger.MethodAuthorizationHandlers(authBuilder, jaeger.ElasticAuthTag), + ) // Note that we intentionally do not use a grpc.Creds ServerOption // even if TLS is enabled, as TLS is handled by the net/http server. @@ -196,7 +199,7 @@ func newGRPCServer( batchProcessor, } - jaeger.RegisterGRPCServices(srv, authBuilder, jaeger.ElasticAuthTag, logger, batchProcessor, agentcfgFetcher) + jaeger.RegisterGRPCServices(srv, logger, batchProcessor, agentcfgFetcher) if err := otlp.RegisterGRPCServices(srv, batchProcessor); err != nil { return nil, err } diff --git a/tests/system/test_jaeger.py b/tests/system/test_jaeger.py index 058d556fcc8..5c74d24f7d5 100644 --- a/tests/system/test_jaeger.py +++ b/tests/system/test_jaeger.py @@ -93,7 +93,7 @@ def test_jaeger_unauthorized(self): ) stdout, stderr = proc.communicate() self.assertNotEqual(proc.returncode, 0) - self.assertRegex(stderr.decode("utf-8"), "not authorized") + self.assertRegex(stderr.decode("utf-8"), "Unauthenticated") def test_jaeger_authorized(self): """ From 97a6f9b86f2c30493696e2f9d10e1236accbe672 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 23 Jun 2021 16:37:28 +0800 Subject: [PATCH 2/3] beater/api/intake: fix rate limiting (#5518) * beater/api/intake: fix rate limiting Copy the batch processor in the closure before copying, to avoid wrapping it multiple times. * systemtest: fix rate limiting test The test was wrong, and only passed because of the bug that has been fixed. We should be using up the per-IP event limits on the first two requests, but weren't taking into account the burst multiplier (limit x 3). --- beater/api/intake/handler.go | 2 ++ beater/api/intake/handler_test.go | 26 ++++++++++++++++++++++++++ systemtest/rum_test.go | 6 ++---- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/beater/api/intake/handler.go b/beater/api/intake/handler.go index 3d9a4d54937..6a29d24abfe 100644 --- a/beater/api/intake/handler.go +++ b/beater/api/intake/handler.go @@ -81,6 +81,8 @@ func Handler(handler StreamHandler, requestMetadataFunc RequestMetadataFunc, bat return } + // copy batchProcessor to avoid updating closure below + batchProcessor := batchProcessor if limiter, ok := ratelimit.FromContext(c.Request.Context()); ok { // Apply rate limiting after reading but before processing any events. batchProcessor = modelprocessor.Chained{ diff --git a/beater/api/intake/handler_test.go b/beater/api/intake/handler_test.go index ed4074520f4..1c13b1fb88e 100644 --- a/beater/api/intake/handler_test.go +++ b/beater/api/intake/handler_test.go @@ -217,6 +217,32 @@ func TestRateLimiting(t *testing.T) { } } +func TestRateLimitingRequests(t *testing.T) { + // Check that rate limiting across multiple requests is handled correctly. + // + // ratelimit.ndjson contains 19 events, and we rate limit in batches of 10 + // events. The burst of 41 should be enough for 2 iterations with one left. + limiter := rate.NewLimiter(1, 41) + processor := stream.BackendProcessor(config.DefaultConfig()) + handler := Handler(processor, emptyRequestMetadata, modelprocessor.Nop{}) + + data, err := ioutil.ReadFile("../../../testdata/intake-v2/ratelimit.ndjson") + require.NoError(t, err) + for i := 0; i < 2; i++ { + r := httptest.NewRequest("POST", "/", bytes.NewBuffer(data)) + r = r.WithContext(ratelimit.ContextWithLimiter(r.Context(), limiter)) + r.Header.Add("Content-Type", "application/x-ndjson") + + w := httptest.NewRecorder() + c := request.NewContext() + c.Reset(w, r) + handler(c) + assert.Equal(t, http.StatusAccepted, w.Code) + } + assert.True(t, limiter.Allow()) + assert.False(t, limiter.Allow()) +} + type testcaseIntakeHandler struct { c *request.Context w *httptest.ResponseRecorder diff --git a/systemtest/rum_test.go b/systemtest/rum_test.go index 4d82b0a71ef..59a4ef03764 100644 --- a/systemtest/rum_test.go +++ b/systemtest/rum_test.go @@ -184,13 +184,11 @@ func TestRUMRateLimit(t *testing.T) { // Just check that rate limiting is wired up. More specific rate limiting scenarios are unit tested. var g errgroup.Group - g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit) }) - g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit) }) + g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit*3) }) + g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit*3) }) assert.NoError(t, g.Wait()) g = errgroup.Group{} - g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit) }) - g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit) }) g.Go(func() error { return sendEvents("10.11.12.15", srv.Config.RUM.RateLimit.EventLimit) }) err = g.Wait() require.Error(t, err) From 9453fe24064ea429a82160c3321ea09a6c4603a4 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Wed, 23 Jun 2021 13:27:14 +0200 Subject: [PATCH 3/3] Service specific source maps (#5410) Serve service-specific sourcemaps from fleet-server --- _meta/beat.yml | 5 +- apm-server.docker.yml | 5 +- apm-server.yml | 5 +- apmpackage/apm/manifest.yml | 6 - beater/beater.go | 68 +++++++--- beater/beater_test.go | 68 +++++++++- beater/config/config_test.go | 17 +++ beater/config/integration.go | 7 +- beater/config/rum.go | 22 ++-- beater/config/sourcemapping.go | 26 ++++ changelogs/head.asciidoc | 1 + model/error_test.go | 2 +- model/sourcemap_test.go | 2 +- model/stacktrace_frame_test.go | 2 +- sourcemap/es_store.go | 15 +++ sourcemap/fleet_store.go | 139 +++++++++++++++++++++ sourcemap/fleet_store_test.go | 79 ++++++++++++ sourcemap/store.go | 82 +++++++++--- sourcemap/store_test.go | 139 ++++++++++++++++++++- tests/system/test_integration_sourcemap.py | 23 ---- 20 files changed, 619 insertions(+), 94 deletions(-) create mode 100644 beater/config/sourcemapping.go create mode 100644 sourcemap/fleet_store.go create mode 100644 sourcemap/fleet_store_test.go diff --git a/_meta/beat.yml b/_meta/beat.yml index 9dee7ba5fb5..bc24d73ffe5 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -263,9 +263,12 @@ apm-server: # Sourcemapping is enabled by default. #enabled: true - # Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration. + # Source maps may be fetched from Elasticsearch by using the + # output.elasticsearch configuration. # A different instance must be configured when using any other output. # This setting only affects sourcemap reads - the output determines where sourcemaps are written. + # Note: Configuring elasticsearch is not supported if apm-server is being + # managed by Fleet. #elasticsearch: # Array of hosts to connect to. # Scheme and port can be left out and will be set to the default (`http` and `9200`). diff --git a/apm-server.docker.yml b/apm-server.docker.yml index 12a6cbf82c6..28f224082a2 100644 --- a/apm-server.docker.yml +++ b/apm-server.docker.yml @@ -263,9 +263,12 @@ apm-server: # Sourcemapping is enabled by default. #enabled: true - # Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration. + # Source maps may be fetched from Elasticsearch by using the + # output.elasticsearch configuration. # A different instance must be configured when using any other output. # This setting only affects sourcemap reads - the output determines where sourcemaps are written. + # Note: Configuring elasticsearch is not supported if apm-server is being + # managed by Fleet. #elasticsearch: # Array of hosts to connect to. # Scheme and port can be left out and will be set to the default (`http` and `9200`). diff --git a/apm-server.yml b/apm-server.yml index 7ac9574daad..f78e05193a0 100644 --- a/apm-server.yml +++ b/apm-server.yml @@ -263,9 +263,12 @@ apm-server: # Sourcemapping is enabled by default. #enabled: true - # Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration. + # Source maps may be fetched from Elasticsearch by using the + # output.elasticsearch configuration. # A different instance must be configured when using any other output. # This setting only affects sourcemap reads - the output determines where sourcemaps are written. + # Note: Configuring elasticsearch is not supported if apm-server is being + # managed by Fleet. #elasticsearch: # Array of hosts to connect to. # Scheme and port can be left out and will be set to the default (`http` and `9200`). diff --git a/apmpackage/apm/manifest.yml b/apmpackage/apm/manifest.yml index e99c15f177b..77b07b6faea 100644 --- a/apmpackage/apm/manifest.yml +++ b/apmpackage/apm/manifest.yml @@ -120,12 +120,6 @@ policy_templates: required: false show_user: false default: 10000 - - name: sourcemap_api_key - type: text - title: RUM - API Key for Sourcemaps - required: false - description: API Key for sourcemap feature. Enter as : - show_user: false - name: api_key_limit type: integer title: Maximum number of API Keys for Agent authentication diff --git a/beater/beater.go b/beater/beater.go index 0945ed8d549..c45177e05b9 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -20,6 +20,7 @@ package beater import ( "context" "net" + "net/http" "regexp" "runtime" "strings" @@ -27,11 +28,12 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/common/fleetmode" - - "github.com/elastic/beats/v7/libbeat/kibana" + "github.com/elastic/beats/v7/libbeat/common/transport" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/pkg/errors" "go.elastic.co/apm" + "go.elastic.co/apm/module/apmhttp" "golang.org/x/sync/errgroup" "github.com/elastic/beats/v7/libbeat/beat" @@ -256,8 +258,8 @@ func (s *serverCreator) Create(p beat.PipelineConnector, rawConfig *common.Confi sharedServerRunnerParams: s.args, Namespace: namespace, Pipeline: p, - KibanaConfig: &integrationConfig.Fleet.Kibana, RawConfig: apmServerCommonConfig, + FleetConfig: &integrationConfig.Fleet, }) } @@ -279,6 +281,7 @@ type serverRunner struct { acker *waitPublishedAcker namespace string config *config.Config + fleetConfig *config.Fleet beat *beat.Beat logger *logp.Logger tracer *apm.Tracer @@ -289,10 +292,10 @@ type serverRunner struct { type serverRunnerParams struct { sharedServerRunnerParams - Namespace string - Pipeline beat.PipelineConnector - KibanaConfig *kibana.ClientConfig - RawConfig *common.Config + Namespace string + Pipeline beat.PipelineConnector + RawConfig *common.Config + FleetConfig *config.Fleet } type sharedServerRunnerParams struct { @@ -310,10 +313,6 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne return nil, err } - if cfg.DataStreams.Enabled && args.KibanaConfig != nil { - cfg.Kibana.ClientConfig = *args.KibanaConfig - } - runServerContext, cancel := context.WithCancel(ctx) return &serverRunner{ backgroundContext: ctx, @@ -321,6 +320,7 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne cancelRunServerContext: cancel, config: cfg, + fleetConfig: args.FleetConfig, acker: args.Acker, pipeline: args.Pipeline, namespace: args.Namespace, @@ -359,8 +359,7 @@ func (s *serverRunner) Start() { func (s *serverRunner) run() error { // Send config to telemetry. recordAPMServerConfig(s.config) - - transformConfig, err := newTransformConfig(s.beat.Info, s.config) + transformConfig, err := newTransformConfig(s.beat.Info, s.config, s.fleetConfig) if err != nil { return err } @@ -602,7 +601,7 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ } } -func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Config, error) { +func newTransformConfig(beatInfo beat.Info, cfg *config.Config, fleetCfg *config.Fleet) (*transform.Config, error) { transformConfig := &transform.Config{ DataStreams: cfg.DataStreams.Enabled, RUM: transform.RUMConfig{ @@ -611,8 +610,8 @@ func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Conf }, } - if cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled && cfg.RumConfig.SourceMapping.ESConfig != nil { - store, err := newSourcemapStore(beatInfo, cfg.RumConfig.SourceMapping) + if cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled { + store, err := newSourcemapStore(beatInfo, cfg.RumConfig.SourceMapping, fleetCfg) if err != nil { return nil, err } @@ -622,13 +621,44 @@ func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Conf return transformConfig, nil } -func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping) (*sourcemap.Store, error) { - esClient, err := elasticsearch.NewClient(cfg.ESConfig) +func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping, fleetCfg *config.Fleet) (*sourcemap.Store, error) { + if fleetCfg != nil { + var ( + c = *http.DefaultClient + rt = http.DefaultTransport + ) + var tlsConfig *tlscommon.TLSConfig + var err error + if fleetCfg.TLS.IsEnabled() { + if tlsConfig, err = tlscommon.LoadTLSConfig(fleetCfg.TLS); err != nil { + return nil, err + } + } + + // Default for es is 90s :shrug: + timeout := 30 * time.Second + dialer := transport.NetDialer(timeout) + tlsDialer, err := transport.TLSDialer(dialer, tlsConfig, timeout) + if err != nil { + return nil, err + } + + rt = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: dialer.Dial, + DialTLS: tlsDialer.Dial, + TLSClientConfig: tlsConfig.ToConfig(), + } + + c.Transport = apmhttp.WrapRoundTripper(rt) + return sourcemap.NewFleetStore(&c, fleetCfg, cfg.Metadata, cfg.Cache.Expiration) + } + c, err := elasticsearch.NewClient(cfg.ESConfig) if err != nil { return nil, err } index := strings.ReplaceAll(cfg.IndexPattern, "%{[observer.version]}", beatInfo.Version) - return sourcemap.NewStore(esClient, index, cfg.Cache.Expiration) + return sourcemap.NewElasticsearchStore(c, index, cfg.Cache.Expiration) } // WrapRunServerWithProcessors wraps runServer such that it wraps args.Reporter diff --git a/beater/beater_test.go b/beater/beater_test.go index b2123af23f2..b7295aac0c0 100644 --- a/beater/beater_test.go +++ b/beater/beater_test.go @@ -18,6 +18,7 @@ package beater import ( + "compress/zlib" "context" "errors" "net" @@ -36,7 +37,9 @@ import ( "go.uber.org/zap/zaptest/observer" "github.com/elastic/apm-server/beater/config" + "github.com/elastic/apm-server/elasticsearch" "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/sourcemap/test" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/instrumentation" @@ -246,7 +249,7 @@ func TestTransformConfigIndex(t *testing.T) { cfg.RumConfig.SourceMapping.IndexPattern = indexPattern } - transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg) + transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil) require.NoError(t, err) require.NotNil(t, transformConfig.RUM.SourcemapStore) transformConfig.RUM.SourcemapStore.Added(context.Background(), "name", "version", "path") @@ -266,7 +269,7 @@ func TestTransformConfig(t *testing.T) { cfg := config.DefaultConfig() cfg.RumConfig.Enabled = rumEnabled cfg.RumConfig.SourceMapping.Enabled = sourcemapEnabled - transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg) + transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil) require.NoError(t, err) if expectSourcemapStore { assert.NotNil(t, transformConfig.RUM.SourcemapStore) @@ -280,3 +283,64 @@ func TestTransformConfig(t *testing.T) { test(true, false, false) test(true, true, true) } + +func TestStoreUsesRUMElasticsearchConfig(t *testing.T) { + var called bool + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.Write([]byte(test.ValidSourcemap)) + })) + defer ts.Close() + + cfg := config.DefaultConfig() + cfg.RumConfig.Enabled = true + cfg.RumConfig.SourceMapping.Enabled = true + cfg.RumConfig.SourceMapping.ESConfig = elasticsearch.DefaultConfig() + cfg.RumConfig.SourceMapping.ESConfig.Hosts = []string{ts.URL} + + transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil) + require.NoError(t, err) + // Check that the provided rum elasticsearch config was used and + // Fetch() goes to the test server. + _, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path") + require.NoError(t, err) + + assert.True(t, called) +} + +func TestFleetStoreUsed(t *testing.T) { + var called bool + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + wr := zlib.NewWriter(w) + defer wr.Close() + wr.Write([]byte(test.ValidSourcemap)) + })) + defer ts.Close() + + cfg := config.DefaultConfig() + cfg.RumConfig.Enabled = true + cfg.RumConfig.SourceMapping.Enabled = true + cfg.RumConfig.SourceMapping.Metadata = []config.SourceMapMetadata{{ + ServiceName: "app", + ServiceVersion: "1.0", + BundleFilepath: "/bundle/path", + SourceMapURL: "/my/path", + }} + + fleetCfg := &config.Fleet{ + Hosts: []string{ts.URL[7:]}, + Protocol: "http", + AccessAPIKey: "my-key", + TLS: nil, + } + + transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, fleetCfg) + require.NoError(t, err) + // Check that the provided rum elasticsearch config was used and + // Fetch() goes to the test server. + _, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path") + require.NoError(t, err) + + assert.True(t, called) +} diff --git a/beater/config/config_test.go b/beater/config/config_test.go index 9b391b9cc2e..32e68dd031c 100644 --- a/beater/config/config_test.go +++ b/beater/config/config_test.go @@ -209,6 +209,7 @@ func TestUnpackConfig(t *testing.T) { MaxRetries: 3, Backoff: elasticsearch.DefaultBackoffConfig, }, + Metadata: []SourceMapMetadata{}, esConfigured: true, }, LibraryPattern: "^custom", @@ -295,6 +296,14 @@ func TestUnpackConfig(t *testing.T) { "rum": map[string]interface{}{ "enabled": true, "source_mapping": map[string]interface{}{ + "metadata": []map[string]string{ + { + "service.name": "opbeans-rum", + "service.version": "1.2.3", + "bundle.filepath": "/test/e2e/general-usecase/bundle.js.map", + "sourcemap.url": "http://somewhere.com/bundle.js.map", + }, + }, "cache": map[string]interface{}{ "expiration": 7, }, @@ -376,6 +385,14 @@ func TestUnpackConfig(t *testing.T) { }, IndexPattern: "apm-*-sourcemap*", ESConfig: elasticsearch.DefaultConfig(), + Metadata: []SourceMapMetadata{ + { + ServiceName: "opbeans-rum", + ServiceVersion: "1.2.3", + BundleFilepath: "/test/e2e/general-usecase/bundle.js.map", + SourceMapURL: "http://somewhere.com/bundle.js.map", + }, + }, }, LibraryPattern: "rum", ExcludeFromGrouping: "^/webpack", diff --git a/beater/config/integration.go b/beater/config/integration.go index 99c5c321e15..9c6c61d509f 100644 --- a/beater/config/integration.go +++ b/beater/config/integration.go @@ -21,7 +21,7 @@ import ( "errors" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/kibana" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) func NewIntegrationConfig(rootConfig *common.Config) (*IntegrationConfig, error) { @@ -66,5 +66,8 @@ type Package struct { } type Fleet struct { - Kibana kibana.ClientConfig `config:"kibana"` + Hosts []string `config:"hosts"` + Protocol string `config:"protocol"` + AccessAPIKey string `config:"access_api_key"` + TLS *tlscommon.Config `config:"ssl"` } diff --git a/beater/config/rum.go b/beater/config/rum.go index 000280d9c3a..1e61e98b344 100644 --- a/beater/config/rum.go +++ b/beater/config/rum.go @@ -58,12 +58,13 @@ type EventRate struct { LruSize int `config:"lru_size"` } -// SourceMapping holds sourecemap config information +// SourceMapping holds sourcemap config information type SourceMapping struct { Cache Cache `config:"cache"` Enabled bool `config:"enabled"` IndexPattern string `config:"index_pattern"` ESConfig *elasticsearch.Config `config:"elasticsearch"` + Metadata []SourceMapMetadata `config:"metadata"` esConfigured bool } @@ -79,14 +80,13 @@ func (c *RumConfig) setup(log *logp.Logger, dataStreamsEnabled bool, outputESCfg return errors.Wrapf(err, "Invalid regex for `exclude_from_grouping`: ") } - var apiKey string - if c.SourceMapping.esConfigured { - if dataStreamsEnabled { - // when running under Fleet, the only setting configured is the api key - apiKey = c.SourceMapping.ESConfig.APIKey - } else { - return nil - } + if c.SourceMapping.esConfigured && len(c.SourceMapping.Metadata) > 0 { + return errors.New("configuring both source_mapping.elasticsearch and sourcemapping.source_maps not allowed") + } + + // No need to unpack the ESConfig if SourceMapMetadata exist + if len(c.SourceMapping.Metadata) > 0 { + return nil } // fall back to elasticsearch output configuration for sourcemap storage if possible @@ -98,9 +98,6 @@ func (c *RumConfig) setup(log *logp.Logger, dataStreamsEnabled bool, outputESCfg if err := outputESCfg.Unpack(c.SourceMapping.ESConfig); err != nil { return errors.Wrap(err, "unpacking Elasticsearch config into Sourcemap config") } - if c.SourceMapping.ESConfig.APIKey == "" { - c.SourceMapping.ESConfig.APIKey = apiKey - } return nil } @@ -119,6 +116,7 @@ func defaultSourcemapping() SourceMapping { Cache: Cache{Expiration: defaultSourcemapCacheExpiration}, IndexPattern: defaultSourcemapIndexPattern, ESConfig: elasticsearch.DefaultConfig(), + Metadata: []SourceMapMetadata{}, } } diff --git a/beater/config/sourcemapping.go b/beater/config/sourcemapping.go new file mode 100644 index 00000000000..ab98cf44426 --- /dev/null +++ b/beater/config/sourcemapping.go @@ -0,0 +1,26 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +// SourceMapMetadata holds source map configuration information. +type SourceMapMetadata struct { + ServiceName string `config:"service.name"` + ServiceVersion string `config:"service.version"` + BundleFilepath string `config:"bundle.filepath"` + SourceMapURL string `config:"sourcemap.url"` +} diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index f7e4da13526..903614c6086 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -27,6 +27,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits] * Add support for histograms to metrics intake {pull}5360[5360] * Upgrade Go to 1.16.5 {pull}5454[5454] * Add units to metric fields {pull}5395[5395] +* Support fetching sourcemaps from fleet {pull}5410[5410] * Add support for adjusting OTel event timestamps using `telemetry.sdk.elastic_export_timestamp` {pull}5433[5433] * Add support for OpenTelemetry labels describing mobile connectivity {pull}5436[5436] * Introduce `apm-server.auth.*` config {pull}5457[5457] diff --git a/model/error_test.go b/model/error_test.go index 9bd019d9a24..781b20f79f1 100644 --- a/model/error_test.go +++ b/model/error_test.go @@ -805,7 +805,7 @@ func TestSourcemapping(t *testing.T) { assert.Equal(t, 1, *event.Exception.Stacktrace[0].Lineno) // transform with sourcemap store - store, err := sourcemap.NewStore(test.ESClientWithValidSourcemap(t), "apm-*sourcemap*", time.Minute) + store, err := sourcemap.NewElasticsearchStore(test.ESClientWithValidSourcemap(t), "apm-*sourcemap*", time.Minute) require.NoError(t, err) transformedWithSourcemap := event.fields(context.Background(), &transform.Config{ RUM: transform.RUMConfig{SourcemapStore: store}, diff --git a/model/sourcemap_test.go b/model/sourcemap_test.go index 2b9e6d20414..6ddec9ec273 100644 --- a/model/sourcemap_test.go +++ b/model/sourcemap_test.go @@ -86,7 +86,7 @@ func TestInvalidateCache(t *testing.T) { // create sourcemap store client, err := estest.NewElasticsearchClient(estest.NewTransport(t, http.StatusOK, nil)) require.NoError(t, err) - store, err := sourcemap.NewStore(client, "foo", time.Minute) + store, err := sourcemap.NewElasticsearchStore(client, "foo", time.Minute) require.NoError(t, err) // transform with sourcemap store diff --git a/model/stacktrace_frame_test.go b/model/stacktrace_frame_test.go index b3f00cd8e56..8c890b8afbb 100644 --- a/model/stacktrace_frame_test.go +++ b/model/stacktrace_frame_test.go @@ -422,7 +422,7 @@ func TestLibraryFrame(t *testing.T) { } func testSourcemapStore(t *testing.T, client elasticsearch.Client) *sourcemap.Store { - store, err := sourcemap.NewStore(client, "apm-*sourcemap*", time.Minute) + store, err := sourcemap.NewElasticsearchStore(client, "apm-*sourcemap*", time.Minute) require.NoError(t, err) return store } diff --git a/sourcemap/es_store.go b/sourcemap/es_store.go index 84afd0ca7c8..ad999f4d17e 100644 --- a/sourcemap/es_store.go +++ b/sourcemap/es_store.go @@ -25,12 +25,14 @@ import ( "io" "io/ioutil" "net/http" + "time" "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/apm-server/elasticsearch" + logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/utility" ) @@ -65,6 +67,19 @@ type esSourcemapResponse struct { } `json:"hits"` } +// NewElasticsearchStore returns an instance of Store for interacting with +// sourcemaps stored in ElasticSearch. +func NewElasticsearchStore( + c elasticsearch.Client, + index string, + expiration time.Duration, +) (*Store, error) { + logger := logp.NewLogger(logs.Sourcemap) + s := &esStore{c, index, logger} + + return newStore(s, logger, expiration) +} + func (s *esStore) fetch(ctx context.Context, name, version, path string) (string, error) { statusCode, body, err := s.runSearchQuery(ctx, name, version, path) if err != nil { diff --git a/sourcemap/fleet_store.go b/sourcemap/fleet_store.go new file mode 100644 index 00000000000..f2108435952 --- /dev/null +++ b/sourcemap/fleet_store.go @@ -0,0 +1,139 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sourcemap + +import ( + "bytes" + "compress/zlib" + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/elastic/apm-server/beater/config" + logs "github.com/elastic/apm-server/log" +) + +type fleetStore struct { + apikey string + c *http.Client + fleetURLs map[key]string +} + +type key struct { + ServiceName string + ServiceVersion string + BundleFilepath string +} + +// NewFleetStore returns an instance of Store for interacting with sourcemaps +// stored in Fleet-Server. +func NewFleetStore( + c *http.Client, + fleetCfg *config.Fleet, + cfgs []config.SourceMapMetadata, + expiration time.Duration, +) (*Store, error) { + if len(fleetCfg.Hosts) < 1 { + return nil, errors.New("no fleet hosts present for fleet store") + } + logger := logp.NewLogger(logs.Sourcemap) + s, err := newFleetStore(c, fleetCfg, cfgs) + if err != nil { + return nil, err + } + return newStore(s, logger, expiration) +} + +func newFleetStore( + c *http.Client, + fleetCfg *config.Fleet, + cfgs []config.SourceMapMetadata, +) (fleetStore, error) { + // TODO(stn): Add support for multiple fleet hosts + // cf. https://github.com/elastic/apm-server/issues/5514 + host := fleetCfg.Hosts[0] + fleetURLs := make(map[key]string) + + for _, cfg := range cfgs { + k := key{cfg.ServiceName, cfg.ServiceVersion, cfg.BundleFilepath} + u, err := common.MakeURL(fleetCfg.Protocol, cfg.SourceMapURL, host, 8220) + if err != nil { + return fleetStore{}, err + } + fleetURLs[k] = u + } + return fleetStore{ + apikey: "ApiKey " + fleetCfg.AccessAPIKey, + fleetURLs: fleetURLs, + c: c, + }, nil +} + +func (f fleetStore) fetch(ctx context.Context, name, version, path string) (string, error) { + k := key{name, version, path} + fleetURL, ok := f.fleetURLs[k] + if !ok { + return "", fmt.Errorf("unable to find sourcemap.url for service.name=%s service.version=%s bundle.path=%s", + name, version, path, + ) + } + + req, err := http.NewRequest(http.MethodGet, fleetURL, nil) + if err != nil { + return "", err + } + req.Header.Add("Authorization", f.apikey) + + resp, err := f.c.Do(req.WithContext(ctx)) + if err != nil { + return "", err + } + defer resp.Body.Close() + + // Verify that we should only get 200 back from fleet-server + if resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("failure querying fleet: statuscode=%d response=(failed to read body)", resp.StatusCode) + } + return "", fmt.Errorf("failure querying fleet: statuscode=%d response=%s", resp.StatusCode, body) + } + + // Looking at the index in elasticsearch, currently + // - no encryption + // - zlib compression + r, err := zlib.NewReader(resp.Body) + if err != nil { + return "", err + } + + buf := new(bytes.Buffer) + if _, err := io.Copy(buf, r); err != nil { + return "", err + } + + return buf.String(), nil +} diff --git a/sourcemap/fleet_store_test.go b/sourcemap/fleet_store_test.go new file mode 100644 index 00000000000..66b36340e96 --- /dev/null +++ b/sourcemap/fleet_store_test.go @@ -0,0 +1,79 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sourcemap + +import ( + "compress/zlib" + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/elastic/apm-server/beater/config" + + "github.com/stretchr/testify/assert" +) + +func TestFleetFetch(t *testing.T) { + var ( + hasAuth bool + apikey = "supersecret" + name = "webapp" + version = "1.0.0" + path = "/my/path/to/bundle.js.map" + wantRes = "sourcemap response" + c = http.DefaultClient + sourceMapPath = "/api/fleet/artifact" + ) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, sourceMapPath, r.URL.Path) + auth := r.Header.Get("Authorization") + hasAuth = auth == "ApiKey "+apikey + // zlib compress + wr := zlib.NewWriter(w) + defer wr.Close() + wr.Write([]byte(wantRes)) + })) + defer ts.Close() + + fleetCfg := &config.Fleet{ + Hosts: []string{ts.URL[7:]}, + Protocol: "http", + AccessAPIKey: apikey, + TLS: nil, + } + + cfgs := []config.SourceMapMetadata{ + { + ServiceName: name, + ServiceVersion: version, + BundleFilepath: path, + SourceMapURL: sourceMapPath, + }, + } + fb, err := newFleetStore(c, fleetCfg, cfgs) + assert.NoError(t, err) + + gotRes, err := fb.fetch(context.Background(), name, version, path) + assert.NoError(t, err) + + assert.Equal(t, wantRes, gotRes) + + assert.True(t, hasAuth) +} diff --git a/sourcemap/store.go b/sourcemap/store.go index 462ffedd3d4..aec47b54833 100644 --- a/sourcemap/store.go +++ b/sourcemap/store.go @@ -21,17 +21,14 @@ import ( "context" "math" "strings" + "sync" "time" - "github.com/elastic/apm-server/elasticsearch" - "github.com/go-sourcemap/sourcemap" gocache "github.com/patrickmn/go-cache" "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/logp" - - logs "github.com/elastic/apm-server/log" ) const ( @@ -42,30 +39,41 @@ var ( errInit = errors.New("Cache cannot be initialized. Expiration and CleanupInterval need to be >= 0") ) -// Store holds information necessary to fetch a sourcemap, either from an Elasticsearch instance or an internal cache. +// Store holds information necessary to fetch a sourcemap, either from an +// Elasticsearch instance or an internal cache. type Store struct { cache *gocache.Cache - esStore *esStore + backend backend logger *logp.Logger + + mu sync.Mutex + inflight map[string]chan struct{} } -// NewStore creates a new instance for fetching sourcemaps. The client and index parameters are needed to be able to -// fetch sourcemaps from Elasticsearch. The expiration time is used for the internal cache. -func NewStore(client elasticsearch.Client, index string, expiration time.Duration) (*Store, error) { - if expiration < 0 { +type backend interface { + fetch(ctx context.Context, name, version, path string) (string, error) +} + +func newStore( + b backend, + logger *logp.Logger, + cacheExpiration time.Duration, +) (*Store, error) { + if cacheExpiration < 0 { return nil, errInit } - logger := logp.NewLogger(logs.Sourcemap) + return &Store{ - cache: gocache.New(expiration, cleanupInterval(expiration)), - esStore: &esStore{client: client, index: index, logger: logger}, - logger: logger, + cache: gocache.New(cacheExpiration, cleanupInterval(cacheExpiration)), + backend: b, + logger: logger, + inflight: make(map[string]chan struct{}), }, nil } // Fetch a sourcemap from the store. -func (s *Store) Fetch(ctx context.Context, name string, version string, path string) (*sourcemap.Consumer, error) { - key := key([]string{name, version, path}) +func (s *Store) Fetch(ctx context.Context, name, version, path string) (*sourcemap.Consumer, error) { + key := cacheKey([]string{name, version, path}) // fetch from cache if val, found := s.cache.Get(key); found { @@ -73,10 +81,43 @@ func (s *Store) Fetch(ctx context.Context, name string, version string, path str return consumer, nil } + // if the value hasn't been found, check to see if there's an inflight + // request to update the value. + s.mu.Lock() + wait, ok := s.inflight[key] + if ok { + // found an inflight request, wait for it to complete. + s.mu.Unlock() + + select { + case <-wait: + case <-ctx.Done(): + return nil, ctx.Err() + } + // Try to read the value again + return s.Fetch(ctx, name, version, path) + } + + // no inflight request found, add a channel to the map and then + // make the fetch request. + wait = make(chan struct{}) + s.inflight[key] = wait + + s.mu.Unlock() + + // Once the fetch request is complete, close and remove the channel + // from the syncronization map. + defer func() { + s.mu.Lock() + delete(s.inflight, key) + close(wait) + s.mu.Unlock() + }() + // fetch from Elasticsearch and ensure caching for all non-temporary results - sourcemapStr, err := s.esStore.fetch(ctx, name, version, path) + sourcemapStr, err := s.backend.fetch(ctx, name, version, path) if err != nil { - if !strings.Contains(err.Error(), errMsgESFailure) { + if !strings.Contains(err.Error(), "failure querying") { s.add(key, nil) } return nil, err @@ -93,6 +134,7 @@ func (s *Store) Fetch(ctx context.Context, name string, version string, path str return nil, errors.Wrap(err, errMsgParseSourcemap) } s.add(key, consumer) + return consumer, nil } @@ -102,7 +144,7 @@ func (s *Store) Added(ctx context.Context, name string, version string, path str s.logger.Warnf("Overriding sourcemap for service %s version %s and file %s", name, version, path) } - key := key([]string{name, version, path}) + key := cacheKey([]string{name, version, path}) s.cache.Delete(key) if !s.logger.IsDebug() { return @@ -118,7 +160,7 @@ func (s *Store) add(key string, consumer *sourcemap.Consumer) { s.logger.Debugf("Added id %v. Cache now has %v entries.", key, s.cache.ItemCount()) } -func key(s []string) string { +func cacheKey(s []string) string { return strings.Join(s, "_") } diff --git a/sourcemap/store_test.go b/sourcemap/store_test.go index 6c18722166e..ed2782d4438 100644 --- a/sourcemap/store_test.go +++ b/sourcemap/store_test.go @@ -18,8 +18,14 @@ package sourcemap import ( + "compress/zlib" "context" + "errors" "fmt" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" "testing" "time" @@ -28,16 +34,21 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/elasticsearch" + logs "github.com/elastic/apm-server/log" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/apm-server/sourcemap/test" ) -func Test_NewStore(t *testing.T) { - _, err := NewStore(nil, "", -1) +func Test_newStore(t *testing.T) { + logger := logp.NewLogger(logs.Sourcemap) + + _, err := newStore(nil, logger, -1) require.Error(t, err) - f, err := NewStore(nil, "", 100) + f, err := newStore(nil, logger, 100) require.NoError(t, err) assert.NotNil(t, f.cache) } @@ -142,6 +153,126 @@ func TestStore_Fetch(t *testing.T) { }) } +func TestFetchTimeout(t *testing.T) { + var ( + errs int64 + + apikey = "supersecret" + name = "webapp" + version = "1.0.0" + path = "/my/path/to/bundle.js.map" + c = http.DefaultClient + ) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-r.Context().Done() + })) + defer ts.Close() + + fleetCfg := &config.Fleet{ + Hosts: []string{ts.URL}, + Protocol: "https", + AccessAPIKey: apikey, + TLS: nil, + } + cfgs := []config.SourceMapMetadata{ + { + ServiceName: name, + ServiceVersion: version, + BundleFilepath: path, + SourceMapURL: "", + }, + } + b, err := newFleetStore(c, fleetCfg, cfgs) + assert.NoError(t, err) + logger := logp.NewLogger(logs.Sourcemap) + store, err := newStore(b, logger, time.Minute) + assert.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + + _, err = store.Fetch(ctx, name, version, path) + assert.True(t, errors.Is(err, context.DeadlineExceeded)) + atomic.AddInt64(&errs, 1) + + assert.Equal(t, int64(1), errs) +} + +func TestConcurrentFetch(t *testing.T) { + for _, tc := range []struct { + calledWant, errWant, succsWant int64 + }{ + {calledWant: 1, errWant: 0, succsWant: 10}, + {calledWant: 2, errWant: 1, succsWant: 9}, + {calledWant: 4, errWant: 3, succsWant: 7}, + } { + var ( + called, errs, succs int64 + + apikey = "supersecret" + name = "webapp" + version = "1.0.0" + path = "/my/path/to/bundle.js.map" + c = http.DefaultClient + + errsLeft = tc.errWant + ) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt64(&called, 1) + // Simulate the wait for a network request. + time.Sleep(50 * time.Millisecond) + if errsLeft > 0 { + errsLeft-- + http.Error(w, "err", http.StatusInternalServerError) + return + } + wr := zlib.NewWriter(w) + defer wr.Close() + wr.Write([]byte(test.ValidSourcemap)) + })) + defer ts.Close() + + fleetCfg := &config.Fleet{ + Hosts: []string{ts.URL}, + Protocol: "https", + AccessAPIKey: apikey, + TLS: nil, + } + cfgs := []config.SourceMapMetadata{ + { + ServiceName: name, + ServiceVersion: version, + BundleFilepath: path, + SourceMapURL: "", + }, + } + store, err := NewFleetStore(c, fleetCfg, cfgs, time.Minute) + assert.NoError(t, err) + + var wg sync.WaitGroup + for i := 0; i < int(tc.succsWant+tc.errWant); i++ { + wg.Add(1) + go func() { + consumer, err := store.Fetch(context.Background(), name, version, path) + if err != nil { + atomic.AddInt64(&errs, 1) + } else { + assert.NotNil(t, consumer) + atomic.AddInt64(&succs, 1) + } + + wg.Done() + }() + } + + wg.Wait() + assert.Equal(t, tc.errWant, errs) + assert.Equal(t, tc.calledWant, called) + assert.Equal(t, tc.succsWant, succs) + } +} + func TestStore_Added(t *testing.T) { name, version, path := "foo", "1.0.1", "/tmp" key := "foo_1.0.1_/tmp" @@ -202,7 +333,7 @@ func TestCleanupInterval(t *testing.T) { } func testStore(t *testing.T, client elasticsearch.Client) *Store { - store, err := NewStore(client, "apm-*sourcemap*", time.Minute) + store, err := NewElasticsearchStore(client, "apm-*sourcemap*", time.Minute) require.NoError(t, err) return store } diff --git a/tests/system/test_integration_sourcemap.py b/tests/system/test_integration_sourcemap.py index fef3f21bb64..97534fa2a6a 100644 --- a/tests/system/test_integration_sourcemap.py +++ b/tests/system/test_integration_sourcemap.py @@ -208,29 +208,6 @@ def test_rum_transaction(self): assert frames_checked > 0, "no frames checked" -@integration_test -class SourcemapInvalidESConfig(BaseSourcemapTest): - def config(self): - cfg = super(SourcemapInvalidESConfig, self).config() - url = self.split_url(cfg) - cfg.update({ - "smap_es_host": url["host"], - "smap_es_username": url["username"], - "smap_es_password": "xxxx", - }) - return cfg - - def test_unauthorized(self): - # successful - uses output.elasticsearch.* configuration - self.upload_sourcemap() - # unauthorized - uses apm-server.rum.sourcemapping.elasticsearch configuration - self.load_docs_with_template(self.get_error_payload_path(), - self.intake_url, - 'error', - 1) - assert self.log_contains("unable to authenticate user") - - @integration_test class SourcemapESConfigUser(BaseSourcemapTest): def config(self):