From 7b67f5549d00bae9c75041c8f9e195a2a15f39d6 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 7 Jul 2021 17:26:43 +0800 Subject: [PATCH 1/4] Move source mapping to model processing Source mapping transformation is rewritten as a model.BatchProcessor, moved out of the model package and into the sourcemap package. One side-effect of the move to a process, worth debating, is that source mapping now occurs in the HTTP handler goroutine rather than in the publisher, which could increase request latency slightly. On the other hand this means that source mapping for more clients than there are CPUs can now happen concurrently (the publisher is limited to the number of CPUs in this regard); and the handlers will block for up to one second anyway if the publisher is busy/queue is full. If a stack frame cannot be mapped, we no longer set `sourcemap.error` or log anything. Just because RUM and source mapping is enabled, does not mean that all stacks _must_ be source mapped; therefore these "errors" are mostly just noise. Likewise we now only set `sourcemap.updated` when it is true. --- beater/api/asset/sourcemap/handler.go | 11 +- beater/api/asset/sourcemap/handler_test.go | 12 +- beater/api/mux.go | 16 +- beater/api/mux_test.go | 13 +- beater/beater.go | 29 +- beater/beater_test.go | 35 +- beater/http.go | 4 +- beater/server.go | 20 +- beater/tracing.go | 1 + changelogs/head.asciidoc | 1 + model/error.go | 21 +- model/error_test.go | 92 +--- model/sourcemap.go | 9 - model/sourcemap_test.go | 57 --- model/span.go | 2 +- model/span_test.go | 11 +- model/stacktrace.go | 155 ++++-- model/stacktrace_frame.go | 231 --------- model/stacktrace_frame_test.go | 428 ----------------- model/stacktrace_test.go | 453 ++++++++++-------- sourcemap/processor.go | 171 +++++++ sourcemap/processor_test.go | 222 +++++++++ sourcemap/store.go | 5 +- sourcemap/store_test.go | 2 +- .../TestNoMatchingSourcemap.approved.json | 8 - systemtest/sourcemap_test.go | 132 ++--- transform/transform.go | 3 - 27 files changed, 918 insertions(+), 1226 deletions(-) delete mode 100644 model/stacktrace_frame.go delete mode 100644 model/stacktrace_frame_test.go create mode 100644 sourcemap/processor.go create mode 100644 sourcemap/processor_test.go diff --git a/beater/api/asset/sourcemap/handler.go b/beater/api/asset/sourcemap/handler.go index 47a25e89688..eb61bd10e58 100644 --- a/beater/api/asset/sourcemap/handler.go +++ b/beater/api/asset/sourcemap/handler.go @@ -18,6 +18,7 @@ package sourcemap import ( + "context" "errors" "fmt" "io/ioutil" @@ -46,8 +47,14 @@ var ( validateError = monitoring.NewInt(registry, "validation.errors") ) +// AddedNotifier is an interface for notifying of sourcemap additions. +// This is implemented by sourcemap.Store. +type AddedNotifier interface { + NotifyAdded(ctx context.Context, serviceName, serviceVersion, bundleFilepath string) +} + // Handler returns a request.Handler for managing asset requests. -func Handler(report publish.Reporter) request.Handler { +func Handler(report publish.Reporter, notifier AddedNotifier) request.Handler { return func(c *request.Context) { if c.Request.Method != "POST" { c.Result.SetDefault(request.IDResponseErrorsMethodNotAllowed) @@ -99,7 +106,9 @@ func Handler(report publish.Reporter) request.Handler { c.Result.SetWithError(request.IDResponseErrorsFullQueue, err) } c.Write() + return } + notifier.NotifyAdded(c.Request.Context(), smap.ServiceName, smap.ServiceVersion, smap.BundleFilepath) c.Result.SetDefault(request.IDResponseValidAccepted) c.Write() } diff --git a/beater/api/asset/sourcemap/handler_test.go b/beater/api/asset/sourcemap/handler_test.go index 80304279f86..e2cbebdf143 100644 --- a/beater/api/asset/sourcemap/handler_test.go +++ b/beater/api/asset/sourcemap/handler_test.go @@ -42,6 +42,14 @@ import ( "github.com/elastic/apm-server/transform" ) +type notifier struct { + notified bool +} + +func (n *notifier) NotifyAdded(ctx context.Context, serviceName, serviceVersion, bundleFilepath string) { + n.notified = true +} + func TestAssetHandler(t *testing.T) { testcases := map[string]testcaseT{ "method": { @@ -111,6 +119,7 @@ func TestAssetHandler(t *testing.T) { // test assertion assert.Equal(t, tc.code, tc.w.Code) assert.Equal(t, tc.body, tc.w.Body.String()) + assert.Equal(t, tc.code == http.StatusAccepted, tc.notifier.notified) }) } } @@ -122,6 +131,7 @@ type testcaseT struct { contentType string reporter func(ctx context.Context, p publish.PendingReq) error authorizer authorizerFunc + notifier notifier missingSourcemap, missingServiceName, missingServiceVersion, missingBundleFilepath bool @@ -178,7 +188,7 @@ func (tc *testcaseT) setup() error { } c := request.NewContext() c.Reset(tc.w, tc.r) - h := Handler(tc.reporter) + h := Handler(tc.reporter, &tc.notifier) h(c) return nil } diff --git a/beater/api/mux.go b/beater/api/mux.go index 4399bfabfa8..e0cefcaa982 100644 --- a/beater/api/mux.go +++ b/beater/api/mux.go @@ -27,7 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/apm-server/agentcfg" - "github.com/elastic/apm-server/beater/api/asset/sourcemap" + apisourcemap "github.com/elastic/apm-server/beater/api/asset/sourcemap" "github.com/elastic/apm-server/beater/api/config/agent" "github.com/elastic/apm-server/beater/api/intake" "github.com/elastic/apm-server/beater/api/profile" @@ -42,6 +42,7 @@ import ( "github.com/elastic/apm-server/model/modelprocessor" "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/sourcemap" ) const ( @@ -84,6 +85,7 @@ func NewMux( batchProcessor model.BatchProcessor, fetcher agentcfg.Fetcher, ratelimitStore *ratelimit.Store, + sourcemapStore *sourcemap.Store, ) (*http.ServeMux, error) { pool := request.NewContextPool() mux := http.NewServeMux() @@ -101,6 +103,7 @@ func NewMux( reporter: report, batchProcessor: batchProcessor, ratelimitStore: ratelimitStore, + sourcemapStore: sourcemapStore, } type route struct { @@ -151,6 +154,7 @@ type routeBuilder struct { reporter publish.Reporter batchProcessor model.BatchProcessor ratelimitStore *ratelimit.Store + sourcemapStore *sourcemap.Store } func (r *routeBuilder) profileHandler() (request.Handler, error) { @@ -178,6 +182,12 @@ func (r *routeBuilder) rumIntakeHandler(newProcessor func(*config.Config) *strea } return func() (request.Handler, error) { batchProcessor := r.batchProcessor + if r.sourcemapStore != nil { + batchProcessor = modelprocessor.Chained{ + sourcemap.BatchProcessor{Store: r.sourcemapStore}, + batchProcessor, + } + } batchProcessor = batchProcessorWithAllowedServiceNames(batchProcessor, r.cfg.RumConfig.AllowServiceNames) h := intake.Handler(newProcessor(r.cfg), requestMetadataFunc, batchProcessor) return middleware.Wrap(h, rumMiddleware(r.cfg, r.authenticator, r.ratelimitStore, intake.MonitoringMap)...) @@ -185,7 +195,7 @@ func (r *routeBuilder) rumIntakeHandler(newProcessor func(*config.Config) *strea } func (r *routeBuilder) sourcemapHandler() (request.Handler, error) { - h := sourcemap.Handler(r.reporter) + h := apisourcemap.Handler(r.reporter, r.sourcemapStore) return middleware.Wrap(h, sourcemapMiddleware(r.cfg, r.authenticator, r.ratelimitStore)...) } @@ -270,7 +280,7 @@ func sourcemapMiddleware(cfg *config.Config, auth *auth.Authenticator, ratelimit msg = "When APM Server is managed by Fleet, Sourcemaps must be uploaded directly to Elasticsearch." } enabled := cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled && !cfg.DataStreams.Enabled - backendMiddleware := backendMiddleware(cfg, auth, ratelimitStore, sourcemap.MonitoringMap) + backendMiddleware := backendMiddleware(cfg, auth, ratelimitStore, apisourcemap.MonitoringMap) return append(backendMiddleware, middleware.KillSwitchMiddleware(enabled, msg)) } diff --git a/beater/api/mux_test.go b/beater/api/mux_test.go index 5cf4dd9f6dd..b7cd13df409 100644 --- a/beater/api/mux_test.go +++ b/beater/api/mux_test.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/apm-server/beater/request" "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/sourcemap" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/monitoring" ) @@ -78,7 +79,11 @@ func requestToMuxer(cfg *config.Config, r *http.Request) (*httptest.ResponseReco nopReporter := func(context.Context, publish.PendingReq) error { return nil } nopBatchProcessor := model.ProcessBatchFunc(func(context.Context, *model.Batch) error { return nil }) ratelimitStore, _ := ratelimit.NewStore(1000, 1000, 1000) - mux, err := NewMux(beat.Info{Version: "1.2.3"}, cfg, nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore) + var sourcemapStore *sourcemap.Store + mux, err := NewMux( + beat.Info{Version: "1.2.3"}, cfg, + nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore, sourcemapStore, + ) if err != nil { return nil, err } @@ -114,7 +119,11 @@ func newTestMux(t *testing.T, cfg *config.Config) http.Handler { nopReporter := func(context.Context, publish.PendingReq) error { return nil } nopBatchProcessor := model.ProcessBatchFunc(func(context.Context, *model.Batch) error { return nil }) ratelimitStore, _ := ratelimit.NewStore(1000, 1000, 1000) - mux, err := NewMux(beat.Info{Version: "1.2.3"}, cfg, nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore) + var sourcemapStore *sourcemap.Store + mux, err := NewMux( + beat.Info{Version: "1.2.3"}, cfg, + nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore, sourcemapStore, + ) require.NoError(t, err) return mux } diff --git a/beater/beater.go b/beater/beater.go index b3357e3abb1..344095cb365 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -364,10 +364,7 @@ func (s *serverRunner) Start() { func (s *serverRunner) run() error { // Send config to telemetry. recordAPMServerConfig(s.config) - transformConfig, err := newTransformConfig(s.beat.Info, s.config, s.fleetConfig) - if err != nil { - return err - } + transformConfig := newTransformConfig(s.beat.Info, s.config, s.fleetConfig) publisherConfig := &publish.PublisherConfig{ Info: s.beat.Info, Pipeline: s.config.Pipeline, @@ -387,6 +384,15 @@ func (s *serverRunner) run() error { }() } + var sourcemapStore *sourcemap.Store + if s.config.RumConfig.Enabled && s.config.RumConfig.SourceMapping.Enabled { + store, err := newSourcemapStore(s.beat.Info, s.config.RumConfig.SourceMapping, s.fleetConfig) + if err != nil { + return err + } + sourcemapStore = store + } + // When the publisher stops cleanly it will close its pipeline client, // calling the acker's Close method. We need to call Open for each new // publisher to ensure we wait for all clients and enqueued events to @@ -432,6 +438,7 @@ func (s *serverRunner) run() error { Logger: s.logger, Tracer: s.tracer, BatchProcessor: batchProcessor, + SourcemapStore: sourcemapStore, }); err != nil { return err } @@ -618,24 +625,14 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ } } -func newTransformConfig(beatInfo beat.Info, cfg *config.Config, fleetCfg *config.Fleet) (*transform.Config, error) { - transformConfig := &transform.Config{ +func newTransformConfig(beatInfo beat.Info, cfg *config.Config, fleetCfg *config.Fleet) *transform.Config { + return &transform.Config{ DataStreams: cfg.DataStreams.Enabled, RUM: transform.RUMConfig{ LibraryPattern: regexp.MustCompile(cfg.RumConfig.LibraryPattern), ExcludeFromGrouping: regexp.MustCompile(cfg.RumConfig.ExcludeFromGrouping), }, } - - if cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled { - store, err := newSourcemapStore(beatInfo, cfg.RumConfig.SourceMapping, fleetCfg) - if err != nil { - return nil, err - } - transformConfig.RUM.SourcemapStore = store - } - - return transformConfig, nil } func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping, fleetCfg *config.Fleet) (*sourcemap.Store, error) { diff --git a/beater/beater_test.go b/beater/beater_test.go index 71ac57f9d10..4c6c7025797 100644 --- a/beater/beater_test.go +++ b/beater/beater_test.go @@ -252,10 +252,9 @@ func TestTransformConfigIndex(t *testing.T) { cfg.RumConfig.SourceMapping.IndexPattern = indexPattern } - transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil) + store, err := newSourcemapStore(beat.Info{Version: "1.2.3"}, cfg.RumConfig.SourceMapping, nil) require.NoError(t, err) - require.NotNil(t, transformConfig.RUM.SourcemapStore) - transformConfig.RUM.SourcemapStore.Added(context.Background(), "name", "version", "path") + store.NotifyAdded(context.Background(), "name", "version", "path") require.Len(t, requestPaths, 1) path := requestPaths[0] @@ -267,26 +266,6 @@ func TestTransformConfigIndex(t *testing.T) { t.Run("with-observer-version", func(t *testing.T) { test(t, "blah-%{[observer.version]}-blah", "blah-1.2.3-blah") }) } -func TestTransformConfig(t *testing.T) { - test := func(rumEnabled, sourcemapEnabled bool, expectSourcemapStore bool) { - cfg := config.DefaultConfig() - cfg.RumConfig.Enabled = rumEnabled - cfg.RumConfig.SourceMapping.Enabled = sourcemapEnabled - transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil) - require.NoError(t, err) - if expectSourcemapStore { - assert.NotNil(t, transformConfig.RUM.SourcemapStore) - } else { - assert.Nil(t, transformConfig.RUM.SourcemapStore) - } - } - - test(false, false, false) - test(false, true, false) - test(true, false, false) - test(true, true, true) -} - func TestStoreUsesRUMElasticsearchConfig(t *testing.T) { var called bool ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -301,11 +280,11 @@ func TestStoreUsesRUMElasticsearchConfig(t *testing.T) { cfg.RumConfig.SourceMapping.ESConfig = elasticsearch.DefaultConfig() cfg.RumConfig.SourceMapping.ESConfig.Hosts = []string{ts.URL} - transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil) + store, err := newSourcemapStore(beat.Info{Version: "1.2.3"}, cfg.RumConfig.SourceMapping, nil) require.NoError(t, err) // Check that the provided rum elasticsearch config was used and // Fetch() goes to the test server. - _, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path") + _, err = store.Fetch(context.Background(), "app", "1.0", "/bundle/path") require.NoError(t, err) assert.True(t, called) @@ -338,11 +317,9 @@ func TestFleetStoreUsed(t *testing.T) { TLS: nil, } - transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, fleetCfg) + store, err := newSourcemapStore(beat.Info{Version: "1.2.3"}, cfg.RumConfig.SourceMapping, fleetCfg) require.NoError(t, err) - // Check that the provided rum elasticsearch config was used and - // Fetch() goes to the test server. - _, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path") + _, err = store.Fetch(context.Background(), "app", "1.0", "/bundle/path") require.NoError(t, err) assert.True(t, called) diff --git a/beater/http.go b/beater/http.go index 0bef2f45c37..76d2de85677 100644 --- a/beater/http.go +++ b/beater/http.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/model/modelprocessor" "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/sourcemap" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/elastic/beats/v7/libbeat/logp" @@ -57,6 +58,7 @@ func newHTTPServer( batchProcessor model.BatchProcessor, agentcfgFetcher agentcfg.Fetcher, ratelimitStore *ratelimit.Store, + sourcemapStore *sourcemap.Store, ) (*httpServer, error) { // Add a model processor that rate limits, and checks authorization for the agent and service for each event. @@ -66,7 +68,7 @@ func newHTTPServer( batchProcessor, } - mux, err := api.NewMux(info, cfg, reporter, batchProcessor, agentcfgFetcher, ratelimitStore) + mux, err := api.NewMux(info, cfg, reporter, batchProcessor, agentcfgFetcher, ratelimitStore, sourcemapStore) if err != nil { return nil, err } diff --git a/beater/server.go b/beater/server.go index 9ab03633c96..bc252411bc4 100644 --- a/beater/server.go +++ b/beater/server.go @@ -42,6 +42,7 @@ import ( "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/model/modelprocessor" "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/sourcemap" ) // RunServerFunc is a function which runs the APM Server until a @@ -69,6 +70,10 @@ type ServerParams struct { // for self-instrumentation. Tracer *apm.Tracer + // SourcemapStore holds a sourcemap.Store, or nil if source + // mapping is disabled. + SourcemapStore *sourcemap.Store + // BatchProcessor is the model.BatchProcessor that is used // for publishing events to the output, such as Elasticsearch. BatchProcessor model.BatchProcessor @@ -84,7 +89,15 @@ type ServerParams struct { // should remove the reporter parameter. func newBaseRunServer(reporter publish.Reporter) RunServerFunc { return func(ctx context.Context, args ServerParams) error { - srv, err := newServer(args.Logger, args.Info, args.Config, args.Tracer, reporter, args.BatchProcessor) + srv, err := newServer( + args.Logger, + args.Info, + args.Config, + args.Tracer, + reporter, + args.SourcemapStore, + args.BatchProcessor, + ) if err != nil { return err } @@ -118,6 +131,7 @@ func newServer( cfg *config.Config, tracer *apm.Tracer, reporter publish.Reporter, + sourcemapStore *sourcemap.Store, batchProcessor model.BatchProcessor, ) (server, error) { agentcfgFetchReporter := agentcfg.NewReporter(agentcfg.NewFetcher(cfg), batchProcessor, 30*time.Second) @@ -129,7 +143,9 @@ func newServer( if err != nil { return server{}, err } - httpServer, err := newHTTPServer(logger, info, cfg, tracer, reporter, batchProcessor, agentcfgFetchReporter, ratelimitStore) + httpServer, err := newHTTPServer( + logger, info, cfg, tracer, reporter, batchProcessor, agentcfgFetchReporter, ratelimitStore, sourcemapStore, + ) if err != nil { return server{}, err } diff --git a/beater/tracing.go b/beater/tracing.go index e24dfc5031b..6e4a1975168 100644 --- a/beater/tracing.go +++ b/beater/tracing.go @@ -71,6 +71,7 @@ func newTracerServer(listener net.Listener, logger *logp.Logger) (*tracerServer, processBatch, agentcfg.NewFetcher(cfg), ratelimitStore, + nil, // no sourcemap store ) if err != nil { return nil, err diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index e5569058851..7d4e4ab6d91 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -7,6 +7,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits] ==== Breaking Changes * Removed monitoring counters `apm-server.processor.stream.errors.{queue,server,closed}` {pull}5453[5453] * APM Server now responds with 403 (HTTP) and PermissionDenied (gRPC) for authenticated but unauthorized requests {pull}5545[5545] +* `sourcemap.error` and `sourcemap.updated` are no longer set due to failing to find a matching source map {pull}5631[5631] [float] ==== Bug fixes diff --git a/model/error.go b/model/error.go index 5e585714795..2c59d0bbe57 100644 --- a/model/error.go +++ b/model/error.go @@ -168,15 +168,18 @@ func (e *Error) fields(ctx context.Context, cfg *transform.Config) common.MapStr } fields.maybeSetMapStr("log", e.logFields(ctx, cfg)) - e.updateCulprit(cfg) + e.updateCulprit() fields.maybeSetString("culprit", e.Culprit) fields.maybeSetMapStr("custom", customFields(e.Custom)) fields.maybeSetString("grouping_key", e.calcGroupingKey(exceptionChain)) return common.MapStr(fields) } -func (e *Error) updateCulprit(cfg *transform.Config) { - if cfg.RUM.SourcemapStore == nil { +// TODO(axw) introduce another processor which sets library_frame +// and exclude_from_grouping, only applied for RUM. Then we get rid +// of Error.RUM and Span.RUM. +func (e *Error) updateCulprit() { + if !e.RUM { return } var fr *StacktraceFrame @@ -203,7 +206,7 @@ func (e *Error) updateCulprit(cfg *transform.Config) { func findSmappedNonLibraryFrame(frames []*StacktraceFrame) *StacktraceFrame { for _, fr := range frames { - if fr.IsSourcemapApplied() && !fr.IsLibraryFrame() { + if fr.SourcemapUpdated && !fr.IsLibraryFrame() { return fr } } @@ -236,8 +239,12 @@ func (e *Error) exceptionFields(ctx context.Context, cfg *transform.Config, chai ex.set("code", code.String()) } - if st := exception.Stacktrace.transform(ctx, cfg, e.RUM, &e.Metadata.Service); len(st) > 0 { - ex.set("stacktrace", st) + if n := len(exception.Stacktrace); n > 0 { + frames := make([]common.MapStr, n) + for i, frame := range exception.Stacktrace { + frames[i] = frame.transform(cfg, e.RUM) + } + ex.set("stacktrace", frames) } result = append(result, common.MapStr(ex)) @@ -254,7 +261,7 @@ func (e *Error) logFields(ctx context.Context, cfg *transform.Config) common.Map log.maybeSetString("param_message", e.Log.ParamMessage) log.maybeSetString("logger_name", e.Log.LoggerName) log.maybeSetString("level", e.Log.Level) - if st := e.Log.Stacktrace.transform(ctx, cfg, e.RUM, &e.Metadata.Service); len(st) > 0 { + if st := e.Log.Stacktrace.transform(ctx, cfg, e.RUM); len(st) > 0 { log.set("stacktrace", st) } return common.MapStr(log) diff --git a/model/error_test.go b/model/error_test.go index 781b20f79f1..9806b9fc41a 100644 --- a/model/error_test.go +++ b/model/error_test.go @@ -30,9 +30,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/apm-server/sourcemap" - "github.com/elastic/apm-server/sourcemap/test" - "github.com/elastic/apm-server/tests" "github.com/elastic/apm-server/transform" "github.com/elastic/beats/v7/libbeat/common" @@ -234,10 +231,6 @@ func TestEventFields(t *testing.T) { "stacktrace": []common.MapStr{{ "filename": "st file", "exclude_from_grouping": false, - "sourcemap": common.MapStr{ - "error": "Colno mandatory for sourcemapping.", - "updated": false, - }, }}, "code": "13", "message": "exception message", @@ -259,9 +252,7 @@ func TestEventFields(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { - output := tc.Error.appendBeatEvents(context.Background(), &transform.Config{ - RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}}, - }, nil) + output := tc.Error.appendBeatEvents(context.Background(), &transform.Config{}, nil) require.Len(t, output, 1) fields := output[0].Fields["error"] assert.Equal(t, tc.Output, fields) @@ -390,10 +381,6 @@ func TestEvents(t *testing.T) { "stacktrace": []common.MapStr{{ "exclude_from_grouping": false, "filename": "myFile", - "sourcemap": common.MapStr{ - "error": "Colno mandatory for sourcemapping.", - "updated": false, - }, }}, }}, "page": common.MapStr{"url": url, "referer": referer}, @@ -413,7 +400,6 @@ func TestEvents(t *testing.T) { t.Run(name, func(t *testing.T) { outputEvents := tc.Error.appendBeatEvents(context.Background(), &transform.Config{ DataStreams: true, - RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}}, }, nil) require.Len(t, outputEvents, 1) outputEvent := outputEvents[0] @@ -433,110 +419,104 @@ func TestCulprit(t *testing.T) { } stUpdate := Stacktrace{ &StacktraceFrame{Filename: "a", Function: fct}, - &StacktraceFrame{Filename: "a", LibraryFrame: &truthy, SourcemapUpdated: &truthy}, - &StacktraceFrame{Filename: "f", Function: fct, SourcemapUpdated: &truthy}, - &StacktraceFrame{Filename: "bar", Function: fct, SourcemapUpdated: &truthy}, + &StacktraceFrame{Filename: "a", LibraryFrame: &truthy, SourcemapUpdated: true}, + &StacktraceFrame{Filename: "f", Function: fct, SourcemapUpdated: true}, + &StacktraceFrame{Filename: "bar", Function: fct, SourcemapUpdated: true}, } - store := &sourcemap.Store{} tests := []struct { event Error - config transform.Config culprit string msg string }{ { - event: Error{Culprit: c}, - config: transform.Config{}, + event: Error{Culprit: c, RUM: false}, culprit: "foo", - msg: "No Sourcemap in config", + msg: "Not a RUM event", }, { - event: Error{Culprit: c}, - config: transform.Config{RUM: transform.RUMConfig{SourcemapStore: store}}, + event: Error{Culprit: c, RUM: true}, culprit: "foo", msg: "No Stacktrace Frame given.", }, { - event: Error{Culprit: c, Log: &Log{Stacktrace: st}}, - config: transform.Config{RUM: transform.RUMConfig{SourcemapStore: store}}, + event: Error{Culprit: c, RUM: true, Log: &Log{Stacktrace: st}}, culprit: "foo", msg: "Log.StacktraceFrame has no updated frame", }, { event: Error{ Culprit: c, + RUM: true, Log: &Log{ Stacktrace: Stacktrace{ &StacktraceFrame{ Filename: "f", Classname: "xyz", - SourcemapUpdated: &truthy, + SourcemapUpdated: true, }, }, }, }, - config: transform.Config{RUM: transform.RUMConfig{SourcemapStore: store}}, culprit: "f", msg: "Adapt culprit to first valid Log.StacktraceFrame filename information.", }, { event: Error{ Culprit: c, + RUM: true, Log: &Log{ Stacktrace: Stacktrace{ &StacktraceFrame{ Classname: "xyz", - SourcemapUpdated: &truthy, + SourcemapUpdated: true, }, }, }, }, - config: transform.Config{RUM: transform.RUMConfig{SourcemapStore: store}}, culprit: "xyz", msg: "Adapt culprit Log.StacktraceFrame classname information.", }, { event: Error{ Culprit: c, + RUM: true, Exception: &Exception{Stacktrace: stUpdate}, }, - config: transform.Config{RUM: transform.RUMConfig{SourcemapStore: store}}, culprit: "f in fct", msg: "Adapt culprit to first valid Exception.StacktraceFrame information.", }, { event: Error{ Culprit: c, + RUM: true, Log: &Log{Stacktrace: st}, Exception: &Exception{Stacktrace: stUpdate}, }, - config: transform.Config{RUM: transform.RUMConfig{SourcemapStore: store}}, culprit: "f in fct", msg: "Log and Exception StacktraceFrame given, only one changes culprit.", }, { event: Error{ Culprit: c, + RUM: true, Log: &Log{ Stacktrace: Stacktrace{ &StacktraceFrame{ Filename: "a", Function: fct, - SourcemapUpdated: &truthy, + SourcemapUpdated: true, }, }, }, Exception: &Exception{Stacktrace: stUpdate}, }, - config: transform.Config{RUM: transform.RUMConfig{SourcemapStore: store}}, culprit: "a in fct", msg: "Log Stacktrace is prioritized over Exception StacktraceFrame", }, } for idx, test := range tests { t.Run(fmt.Sprint(idx), func(t *testing.T) { - - test.event.updateCulprit(&test.config) + test.event.updateCulprit() assert.Equal(t, test.culprit, test.event.Culprit, fmt.Sprintf("(%v) %s: expected <%v>, received <%v>", idx, test.msg, test.culprit, test.event.Culprit)) }) @@ -777,41 +757,3 @@ func md5With(args ...string) []byte { } return md5.Sum(nil) } - -func TestSourcemapping(t *testing.T) { - event := Error{ - Metadata: Metadata{ - Service: Service{ - Name: "foo", - Version: "bar", - }, - }, - Exception: &Exception{ - Message: "exception message", - Stacktrace: Stacktrace{ - &StacktraceFrame{ - Filename: "/a/b/c", - Lineno: tests.IntPtr(1), - Colno: tests.IntPtr(23), - AbsPath: "../a/b", - }, - }, - }, - RUM: true, - } - - // transform without sourcemap store - transformedNoSourcemap := event.fields(context.Background(), &transform.Config{}) - assert.Equal(t, 1, *event.Exception.Stacktrace[0].Lineno) - - // transform with sourcemap store - store, err := sourcemap.NewElasticsearchStore(test.ESClientWithValidSourcemap(t), "apm-*sourcemap*", time.Minute) - require.NoError(t, err) - transformedWithSourcemap := event.fields(context.Background(), &transform.Config{ - RUM: transform.RUMConfig{SourcemapStore: store}, - }) - assert.Equal(t, 5, *event.Exception.Stacktrace[0].Lineno) - - assert.NotEqual(t, transformedNoSourcemap["exception"], transformedWithSourcemap["exception"]) - assert.NotEqual(t, transformedNoSourcemap["grouping_key"], transformedWithSourcemap["grouping_key"]) -} diff --git a/model/sourcemap.go b/model/sourcemap.go index 1e3b30b95b2..8e1991ab68e 100644 --- a/model/sourcemap.go +++ b/model/sourcemap.go @@ -23,10 +23,8 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" - logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" ) @@ -54,13 +52,6 @@ func (pa *Sourcemap) Transform(ctx context.Context, cfg *transform.Config) []bea if pa == nil { return nil } - - if cfg.RUM.SourcemapStore == nil { - logp.NewLogger(logs.Sourcemap).Error("Sourcemap Accessor is nil, cache cannot be invalidated.") - } else { - cfg.RUM.SourcemapStore.Added(ctx, pa.ServiceName, pa.ServiceVersion, pa.BundleFilepath) - } - ev := beat.Event{ Fields: common.MapStr{ "processor": sourcemapProcessorEntry, diff --git a/model/sourcemap_test.go b/model/sourcemap_test.go index 6ddec9ec273..765447ad5b2 100644 --- a/model/sourcemap_test.go +++ b/model/sourcemap_test.go @@ -20,23 +20,15 @@ package model_test import ( "context" "io/ioutil" - "net/http" "testing" "time" - "go.uber.org/zap/zapcore" - s "github.com/go-sourcemap/sourcemap" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/apm-server/elasticsearch/estest" - logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/model" - "github.com/elastic/apm-server/sourcemap" "github.com/elastic/apm-server/transform" ) @@ -76,52 +68,3 @@ func TestParseSourcemaps(t *testing.T) { assert.True(t, ok) assert.Equal(t, "webpack:///bundle.js", source) } - -func TestInvalidateCache(t *testing.T) { - event := model.Sourcemap{ServiceName: "service", ServiceVersion: "1", BundleFilepath: "js/bundle.js", Sourcemap: "testmap"} - t.Run("withSourcemapStore", func(t *testing.T) { - // collect logs - require.NoError(t, logp.DevelopmentSetup(logp.ToObserverOutput())) - - // create sourcemap store - client, err := estest.NewElasticsearchClient(estest.NewTransport(t, http.StatusOK, nil)) - require.NoError(t, err) - store, err := sourcemap.NewElasticsearchStore(client, "foo", time.Minute) - require.NoError(t, err) - - // transform with sourcemap store - event.Transform(context.Background(), &transform.Config{RUM: transform.RUMConfig{SourcemapStore: store}}) - - logCollection := logp.ObserverLogs().TakeAll() - assert.Equal(t, 2, len(logCollection)) - - // first sourcemap was added - for i, entry := range logCollection { - assert.Equal(t, logs.Sourcemap, entry.LoggerName) - assert.Equal(t, zapcore.DebugLevel, entry.Level) - if i == 0 { - assert.Contains(t, entry.Message, "Added id service_1_js/bundle.js. Cache now has 1 entries.") - } else { - assert.Contains(t, entry.Message, "Removed id service_1_js/bundle.js. Cache now has 0 entries.") - } - } - - }) - - t.Run("noSourcemapStore", func(t *testing.T) { - // collect logs - require.NoError(t, logp.DevelopmentSetup(logp.ToObserverOutput())) - - // transform with no sourcemap store - event.Transform(context.Background(), &transform.Config{RUM: transform.RUMConfig{}}) - - logCollection := logp.ObserverLogs().TakeAll() - assert.Equal(t, 1, len(logCollection)) - for _, entry := range logCollection { - assert.Equal(t, logs.Sourcemap, entry.LoggerName) - assert.Equal(t, zapcore.ErrorLevel, entry.Level) - assert.Contains(t, entry.Message, "cache cannot be invalidated") - } - - }) -} diff --git a/model/span.go b/model/span.go index 843ba4be7ff..1395f5fed0d 100644 --- a/model/span.go +++ b/model/span.go @@ -269,7 +269,7 @@ func (e *Span) fields(ctx context.Context, cfg *transform.Config) common.MapStr // TODO(axw) we should be using a merged service object, combining // the stream metadata and event-specific service info. - if st := e.Stacktrace.transform(ctx, cfg, e.RUM, &e.Metadata.Service); len(st) > 0 { + if st := e.Stacktrace.transform(ctx, cfg, e.RUM); len(st) > 0 { fields.set("stacktrace", st) } return common.MapStr(fields) diff --git a/model/span_test.go b/model/span_test.go index b73e5ae3fa8..9cea421a37f 100644 --- a/model/span_test.go +++ b/model/span_test.go @@ -26,7 +26,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/apm-server/sourcemap" "github.com/elastic/apm-server/transform" ) @@ -105,7 +104,6 @@ func TestSpanTransform(t *testing.T) { Outcome: "unknown", RepresentativeCount: 5, Duration: 1.20, - RUM: true, Stacktrace: Stacktrace{{AbsPath: path}}, Labels: common.MapStr{"label_a": 12}, HTTP: &HTTP{Method: method, StatusCode: statusCode, URL: url}, @@ -114,7 +112,8 @@ func TestSpanTransform(t *testing.T) { Statement: statement, Type: dbType, UserName: user, - RowsAffected: &rowsAffected}, + RowsAffected: &rowsAffected, + }, Destination: &Destination{Address: address, Port: port}, DestinationService: &DestinationService{ Type: destServiceType, @@ -137,10 +136,7 @@ func TestSpanTransform(t *testing.T) { "stacktrace": []common.MapStr{{ "exclude_from_grouping": false, "abs_path": path, - "sourcemap": common.MapStr{ - "error": "Colno mandatory for sourcemapping.", - "updated": false, - }}}, + }}, "db": common.MapStr{ "instance": instance, "statement": statement, @@ -182,7 +178,6 @@ func TestSpanTransform(t *testing.T) { for _, test := range tests { output := test.Span.appendBeatEvents(context.Background(), &transform.Config{ DataStreams: true, - RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}}, }, nil) fields := output[0].Fields assert.Equal(t, test.Output, fields, test.Msg) diff --git a/model/stacktrace.go b/model/stacktrace.go index ba15c8c2a52..95d294f9d36 100644 --- a/model/stacktrace.go +++ b/model/stacktrace.go @@ -19,75 +19,124 @@ package model import ( "context" + "regexp" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" - - logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/transform" ) type Stacktrace []*StacktraceFrame -func (st *Stacktrace) transform(ctx context.Context, cfg *transform.Config, rum bool, service *Service) []common.MapStr { - if st == nil { +type StacktraceFrame struct { + AbsPath string + Filename string + Classname string + Lineno *int + Colno *int + ContextLine string + Module string + Function string + LibraryFrame *bool + Vars common.MapStr + PreContext []string + PostContext []string + + ExcludeFromGrouping bool + + SourcemapUpdated bool + Original Original +} + +type Original struct { + AbsPath string + Filename string + Classname string + Lineno *int + Colno *int + Function string + LibraryFrame *bool +} + +func (st Stacktrace) transform(ctx context.Context, cfg *transform.Config, rum bool) []common.MapStr { + if len(st) == 0 { return nil } - // source map algorithm: - // apply source mapping frame by frame - // if no source map could be found, set updated to false and set sourcemap error - // otherwise use source map library for mapping and update - // - filename: only if it was found - // - function: - // * should be moved down one stack trace frame, - // * the function name of the first frame is set to - // * if one frame is not found in the source map, this frame is left out and - // the function name from the previous frame is used - // * if a mapping could be applied but no function name is found, the - // function name for the next frame is set to - // - colno - // - lineno - // - abs_path is set to the cleaned abs_path - // - sourcmeap.updated is set to true - - if !rum || cfg.RUM.SourcemapStore == nil { - return st.transformFrames(cfg, rum, noSourcemapping) + frames := make([]common.MapStr, len(st)) + for i, frame := range st { + frames[i] = frame.transform(cfg, rum) } - if service == nil || service.Name == "" || service.Version == "" { - return st.transformFrames(cfg, rum, noSourcemapping) + return frames +} + +func (s *StacktraceFrame) transform(cfg *transform.Config, rum bool) common.MapStr { + var m mapStr + m.maybeSetString("filename", s.Filename) + m.maybeSetString("classname", s.Classname) + m.maybeSetString("abs_path", s.AbsPath) + m.maybeSetString("module", s.Module) + m.maybeSetString("function", s.Function) + m.maybeSetMapStr("vars", s.Vars) + + if rum && cfg.RUM.LibraryPattern != nil { + s.setLibraryFrame(cfg.RUM.LibraryPattern) + } + if s.LibraryFrame != nil { + m.set("library_frame", *s.LibraryFrame) } - var errMsg string - var sourcemapErrorSet = map[string]interface{}{} - logger := logp.NewLogger(logs.Stacktrace) - fct := "" - return st.transformFrames(cfg, rum, func(frame *StacktraceFrame) { - fct, errMsg = frame.applySourcemap(ctx, cfg.RUM.SourcemapStore, service, fct) - if errMsg == "" || !logger.IsDebug() { - return - } - if _, ok := sourcemapErrorSet[errMsg]; !ok { - logger.Debug(errMsg) - sourcemapErrorSet[errMsg] = nil - } - }) -} + if rum && cfg.RUM.ExcludeFromGrouping != nil { + s.setExcludeFromGrouping(cfg.RUM.ExcludeFromGrouping) + } + m.set("exclude_from_grouping", s.ExcludeFromGrouping) -func (st *Stacktrace) transformFrames(cfg *transform.Config, rum bool, apply func(*StacktraceFrame)) []common.MapStr { - frameCount := len(*st) - if frameCount == 0 { - return nil + var context mapStr + if len(s.PreContext) > 0 { + context.set("pre", s.PreContext) } + if len(s.PostContext) > 0 { + context.set("post", s.PostContext) + } + m.maybeSetMapStr("context", common.MapStr(context)) + + var line mapStr + line.maybeSetIntptr("number", s.Lineno) + line.maybeSetIntptr("column", s.Colno) + line.maybeSetString("context", s.ContextLine) + m.maybeSetMapStr("line", common.MapStr(line)) - var fr *StacktraceFrame - frames := make([]common.MapStr, frameCount) - for idx := frameCount - 1; idx >= 0; idx-- { - fr = (*st)[idx] - apply(fr) - frames[idx] = fr.transform(cfg, rum) + var sm mapStr + if s.SourcemapUpdated { + sm.set("updated", true) } - return frames + m.maybeSetMapStr("sourcemap", common.MapStr(sm)) + + var orig mapStr + orig.maybeSetBool("library_frame", s.Original.LibraryFrame) + if s.SourcemapUpdated { + orig.maybeSetString("filename", s.Original.Filename) + orig.maybeSetString("classname", s.Original.Classname) + orig.maybeSetString("abs_path", s.Original.AbsPath) + orig.maybeSetString("function", s.Original.Function) + orig.maybeSetIntptr("colno", s.Original.Colno) + orig.maybeSetIntptr("lineno", s.Original.Lineno) + } + m.maybeSetMapStr("original", common.MapStr(orig)) + + return common.MapStr(m) +} + +func (s *StacktraceFrame) IsLibraryFrame() bool { + return s.LibraryFrame != nil && *s.LibraryFrame } -func noSourcemapping(_ *StacktraceFrame) {} +func (s *StacktraceFrame) setExcludeFromGrouping(pattern *regexp.Regexp) { + s.ExcludeFromGrouping = s.Filename != "" && pattern.MatchString(s.Filename) +} + +func (s *StacktraceFrame) setLibraryFrame(pattern *regexp.Regexp) { + s.Original.LibraryFrame = s.LibraryFrame + libraryFrame := (s.Filename != "" && pattern.MatchString(s.Filename)) || + (s.AbsPath != "" && pattern.MatchString(s.AbsPath)) + s.LibraryFrame = &libraryFrame +} diff --git a/model/stacktrace_frame.go b/model/stacktrace_frame.go deleted file mode 100644 index ecd204efa3a..00000000000 --- a/model/stacktrace_frame.go +++ /dev/null @@ -1,231 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package model - -import ( - "context" - "fmt" - "regexp" - - "github.com/elastic/beats/v7/libbeat/common" - - "github.com/elastic/apm-server/sourcemap" - "github.com/elastic/apm-server/transform" - "github.com/elastic/apm-server/utility" -) - -const ( - errMsgSourcemapColumnMandatory = "Colno mandatory for sourcemapping." - errMsgSourcemapLineMandatory = "Lineno mandatory for sourcemapping." - errMsgSourcemapPathMandatory = "AbsPath mandatory for sourcemapping." -) - -type StacktraceFrame struct { - AbsPath string - Filename string - Classname string - Lineno *int - Colno *int - ContextLine string - Module string - Function string - LibraryFrame *bool - Vars common.MapStr - PreContext []string - PostContext []string - - ExcludeFromGrouping bool - - SourcemapUpdated *bool - SourcemapError string - Original Original -} - -type Original struct { - AbsPath string - Filename string - Classname string - Lineno *int - Colno *int - Function string - LibraryFrame *bool - - sourcemapCopied bool -} - -func (s *StacktraceFrame) transform(cfg *transform.Config, rum bool) common.MapStr { - var m mapStr - m.maybeSetString("filename", s.Filename) - m.maybeSetString("classname", s.Classname) - m.maybeSetString("abs_path", s.AbsPath) - m.maybeSetString("module", s.Module) - m.maybeSetString("function", s.Function) - m.maybeSetMapStr("vars", s.Vars) - - if rum && cfg.RUM.LibraryPattern != nil { - s.setLibraryFrame(cfg.RUM.LibraryPattern) - } - if s.LibraryFrame != nil { - m.set("library_frame", *s.LibraryFrame) - } - - if rum && cfg.RUM.ExcludeFromGrouping != nil { - s.setExcludeFromGrouping(cfg.RUM.ExcludeFromGrouping) - } - m.set("exclude_from_grouping", s.ExcludeFromGrouping) - - var context mapStr - if len(s.PreContext) > 0 { - context.set("pre", s.PreContext) - } - if len(s.PostContext) > 0 { - context.set("post", s.PostContext) - } - m.maybeSetMapStr("context", common.MapStr(context)) - - var line mapStr - line.maybeSetIntptr("number", s.Lineno) - line.maybeSetIntptr("column", s.Colno) - line.maybeSetString("context", s.ContextLine) - m.maybeSetMapStr("line", common.MapStr(line)) - - var sm mapStr - sm.maybeSetBool("updated", s.SourcemapUpdated) - sm.maybeSetString("error", s.SourcemapError) - m.maybeSetMapStr("sourcemap", common.MapStr(sm)) - - var orig mapStr - orig.maybeSetBool("library_frame", s.Original.LibraryFrame) - if s.SourcemapUpdated != nil && *(s.SourcemapUpdated) { - orig.maybeSetString("filename", s.Original.Filename) - orig.maybeSetString("classname", s.Original.Classname) - orig.maybeSetString("abs_path", s.Original.AbsPath) - orig.maybeSetString("function", s.Original.Function) - orig.maybeSetIntptr("colno", s.Original.Colno) - orig.maybeSetIntptr("lineno", s.Original.Lineno) - } - m.maybeSetMapStr("original", common.MapStr(orig)) - - return common.MapStr(m) -} - -func (s *StacktraceFrame) IsLibraryFrame() bool { - return s.LibraryFrame != nil && *s.LibraryFrame -} - -func (s *StacktraceFrame) IsSourcemapApplied() bool { - return s.SourcemapUpdated != nil && *s.SourcemapUpdated -} - -func (s *StacktraceFrame) setExcludeFromGrouping(pattern *regexp.Regexp) { - s.ExcludeFromGrouping = s.Filename != "" && pattern.MatchString(s.Filename) -} - -func (s *StacktraceFrame) setLibraryFrame(pattern *regexp.Regexp) { - s.Original.LibraryFrame = s.LibraryFrame - libraryFrame := (s.Filename != "" && pattern.MatchString(s.Filename)) || - (s.AbsPath != "" && pattern.MatchString(s.AbsPath)) - s.LibraryFrame = &libraryFrame -} - -func (s *StacktraceFrame) applySourcemap(ctx context.Context, store *sourcemap.Store, service *Service, prevFunction string) (function string, errMsg string) { - function = prevFunction - - var valid bool - if valid, errMsg = s.validForSourcemapping(); !valid { - s.updateError(errMsg) - return - } - - s.setOriginalSourcemapData() - - path := utility.CleanUrlPath(s.Original.AbsPath) - mapper, err := store.Fetch(ctx, service.Name, service.Version, path) - if err != nil { - errMsg = err.Error() - return - } - if mapper == nil { - errMsg = fmt.Sprintf("No Sourcemap available for ServiceName %s, ServiceVersion %s, Path %s.", - service.Name, service.Version, path) - s.updateError(errMsg) - return - } - - file, fct, line, col, ctxLine, preCtx, postCtx, ok := sourcemap.Map(mapper, *s.Original.Lineno, *s.Original.Colno) - if !ok { - errMsg = fmt.Sprintf("No Sourcemap found for Lineno %v, Colno %v", *s.Original.Lineno, *s.Original.Colno) - s.updateError(errMsg) - return - } - - if file != "" { - s.Filename = file - } - s.Colno = &col - s.Lineno = &line - s.AbsPath = path - s.updateSmap(true) - s.Function = prevFunction - s.ContextLine = ctxLine - s.PreContext = preCtx - s.PostContext = postCtx - - if fct != "" { - function = fct - return - } - function = "" - return -} - -func (s *StacktraceFrame) validForSourcemapping() (bool, string) { - if s.Colno == nil { - return false, errMsgSourcemapColumnMandatory - } - if s.Lineno == nil { - return false, errMsgSourcemapLineMandatory - } - if s.AbsPath == "" { - return false, errMsgSourcemapPathMandatory - } - return true, "" -} - -func (s *StacktraceFrame) setOriginalSourcemapData() { - if s.Original.sourcemapCopied { - return - } - s.Original.Colno = s.Colno - s.Original.AbsPath = s.AbsPath - s.Original.Function = s.Function - s.Original.Lineno = s.Lineno - s.Original.Filename = s.Filename - s.Original.Classname = s.Classname - - s.Original.sourcemapCopied = true -} - -func (s *StacktraceFrame) updateError(errMsg string) { - s.SourcemapError = errMsg - s.updateSmap(false) -} - -func (s *StacktraceFrame) updateSmap(updated bool) { - s.SourcemapUpdated = &updated -} diff --git a/model/stacktrace_frame_test.go b/model/stacktrace_frame_test.go deleted file mode 100644 index 8c890b8afbb..00000000000 --- a/model/stacktrace_frame_test.go +++ /dev/null @@ -1,428 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package model - -import ( - "context" - "fmt" - "regexp" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/apm-server/elasticsearch" - - "github.com/elastic/beats/v7/libbeat/common" - - "github.com/elastic/apm-server/sourcemap" - "github.com/elastic/apm-server/sourcemap/test" - "github.com/elastic/apm-server/transform" -) - -func TestStacktraceFrameTransform(t *testing.T) { - filename, classname := "some file", "foo" - lineno := 1 - colno := 55 - path := "~/./some/abs_path" - context := "context" - fct := "some function" - module := "some_module" - libraryFrame := true - tests := []struct { - StFrame StacktraceFrame - Output common.MapStr - Msg string - }{ - { - StFrame: StacktraceFrame{Filename: filename, Lineno: &lineno}, - Output: common.MapStr{ - "filename": filename, - "line": common.MapStr{"number": lineno}, - "exclude_from_grouping": false, - }, - Msg: "Minimal StacktraceFrame", - }, - { - StFrame: StacktraceFrame{ - AbsPath: path, - Filename: filename, - Classname: classname, - Lineno: &lineno, - Colno: &colno, - ContextLine: context, - Module: module, - Function: fct, - LibraryFrame: &libraryFrame, - Vars: map[string]interface{}{"k1": "v1", "k2": "v2"}, - PreContext: []string{"prec1", "prec2"}, - PostContext: []string{"postc1", "postc2"}, - }, - Output: common.MapStr{ - "abs_path": "~/./some/abs_path", - "filename": "some file", - "classname": "foo", - "function": "some function", - "module": "some_module", - "library_frame": true, - "vars": common.MapStr{"k1": "v1", "k2": "v2"}, - "context": common.MapStr{ - "pre": []string{"prec1", "prec2"}, - "post": []string{"postc1", "postc2"}, - }, - "line": common.MapStr{ - "number": 1, - "column": 55, - "context": "context", - }, - "exclude_from_grouping": false, - }, - Msg: "Full StacktraceFrame", - }, - } - - for idx, test := range tests { - output := test.StFrame.transform(&transform.Config{}, true) - assert.Equal(t, test.Output, output, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) - } -} - -func TestSourcemap_Apply(t *testing.T) { - - name, version, col, line, path := "myservice", "2.1.4", 10, 15, "/../a/path" - validService := func() *Service { - return &Service{Name: name, Version: version} - } - validFrame := func() *StacktraceFrame { - return &StacktraceFrame{Colno: &col, Lineno: &line, AbsPath: path} - } - - t.Run("frame", func(t *testing.T) { - for name, tc := range map[string]struct { - frame *StacktraceFrame - - expectedErrorMsg string - }{ - "noColumn": { - frame: &StacktraceFrame{}, - expectedErrorMsg: "Colno mandatory"}, - "noLine": { - frame: &StacktraceFrame{Colno: &col}, - expectedErrorMsg: "Lineno mandatory"}, - "noPath": { - frame: &StacktraceFrame{Colno: &col, Lineno: &line}, - expectedErrorMsg: "AbsPath mandatory", - }, - } { - t.Run(name, func(t *testing.T) { - function, msg := tc.frame.applySourcemap(context.Background(), &sourcemap.Store{}, validService(), "foo") - assert.Equal(t, "foo", function) - assert.Contains(t, msg, tc.expectedErrorMsg) - assert.Equal(t, new(bool), tc.frame.SourcemapUpdated) - require.NotNil(t, tc.frame.SourcemapError) - assert.Contains(t, tc.frame.SourcemapError, msg) - assert.Zero(t, tc.frame.Original) - }) - } - }) - - t.Run("errorPerFrame", func(t *testing.T) { - for name, tc := range map[string]struct { - store *sourcemap.Store - expectedErrorMsg string - }{ - "noSourcemap": {store: testSourcemapStore(t, test.ESClientWithSourcemapNotFound(t)), - expectedErrorMsg: "No Sourcemap available"}, - "noMapping": {store: testSourcemapStore(t, test.ESClientWithValidSourcemap(t)), - expectedErrorMsg: "No Sourcemap found for Lineno", - }, - } { - t.Run(name, func(t *testing.T) { - frame := validFrame() - function, msg := frame.applySourcemap(context.Background(), tc.store, validService(), "xyz") - assert.Equal(t, "xyz", function) - require.Contains(t, msg, tc.expectedErrorMsg) - assert.NotZero(t, frame.SourcemapError) - assert.Equal(t, new(bool), frame.SourcemapUpdated) - }) - } - }) - - t.Run("mappingError", func(t *testing.T) { - for name, tc := range map[string]struct { - store *sourcemap.Store - expectedErrorMsg string - }{ - "ESUnavailable": {store: testSourcemapStore(t, test.ESClientUnavailable(t)), - expectedErrorMsg: "client error"}, - "invalidSourcemap": {store: testSourcemapStore(t, test.ESClientWithInvalidSourcemap(t)), - expectedErrorMsg: "Could not parse Sourcemap."}, - "unsupportedSourcemap": {store: testSourcemapStore(t, test.ESClientWithUnsupportedSourcemap(t)), - expectedErrorMsg: "only 3rd version is supported"}, - } { - t.Run(name, func(t *testing.T) { - frame := validFrame() - function, msg := frame.applySourcemap(context.Background(), tc.store, validService(), "xyz") - assert.Equal(t, "xyz", function) - require.Contains(t, msg, tc.expectedErrorMsg) - assert.NotZero(t, msg) - assert.Zero(t, frame.SourcemapUpdated) - assert.Zero(t, frame.SourcemapError) - }) - } - }) - - t.Run("mapping", func(t *testing.T) { - - for name, tc := range map[string]struct { - origCol, origLine int - origPath string - - function, file, path, ctxLine string - preCtx, postCtx []string - col, line int - }{ - "withFunction": {origCol: 67, origLine: 1, origPath: "/../a/path", - function: "exports", file: "", path: "/a/path", ctxLine: " \t\t\texports: {},", col: 0, line: 13, - preCtx: []string{" \t\tif(installedModules[moduleId])", " \t\t\treturn installedModules[moduleId].exports;", "", " \t\t// Create a new module (and put it into the cache)", " \t\tvar module = installedModules[moduleId] = {"}, - postCtx: []string{" \t\t\tid: moduleId,", " \t\t\tloaded: false", " \t\t};", "", " \t\t// Execute the module function"}}, - "withFilename": {origCol: 7, origLine: 1, origPath: "/../a/path", - function: "", file: "webpack:///bundle.js", path: "/a/path", - ctxLine: "/******/ (function(modules) { // webpackBootstrap", - preCtx: []string(nil), - postCtx: []string{"/******/ \t// The module cache", "/******/ \tvar installedModules = {};", "/******/", "/******/ \t// The require function", "/******/ \tfunction __webpack_require__(moduleId) {"}, - col: 9, line: 1}, - "withoutFilename": {origCol: 23, origLine: 1, origPath: "/../a/path", - function: "__webpack_require__", file: "", path: "/a/path", ctxLine: " \tfunction __webpack_require__(moduleId) {", - preCtx: []string{" \t// The module cache", " \tvar installedModules = {};", "", " \t// The require function"}, - postCtx: []string{"", " \t\t// Check if module is in cache", " \t\tif(installedModules[moduleId])", " \t\t\treturn installedModules[moduleId].exports;", ""}, - col: 0, line: 5}, - } { - t.Run(name, func(t *testing.T) { - frame := &StacktraceFrame{Colno: &tc.origCol, Lineno: &tc.origLine, AbsPath: tc.origPath} - - prevFunction := "xyz" - function, msg := frame.applySourcemap(context.Background(), testSourcemapStore(t, test.ESClientWithValidSourcemap(t)), validService(), prevFunction) - require.Empty(t, msg) - assert.Zero(t, frame.SourcemapError) - updated := true - assert.Equal(t, &updated, frame.SourcemapUpdated) - - assert.Equal(t, tc.function, function) - assert.Equal(t, prevFunction, frame.Function) - assert.Equal(t, tc.col, *frame.Colno) - assert.Equal(t, tc.line, *frame.Lineno) - assert.Equal(t, tc.path, frame.AbsPath) - assert.Equal(t, tc.ctxLine, frame.ContextLine) - assert.Equal(t, tc.preCtx, frame.PreContext) - assert.Equal(t, tc.postCtx, frame.PostContext) - assert.Equal(t, tc.file, frame.Filename) - assert.NotZero(t, frame.Original) - }) - } - }) -} - -func TestIsLibraryFrame(t *testing.T) { - assert.False(t, (&StacktraceFrame{}).IsLibraryFrame()) - assert.False(t, (&StacktraceFrame{LibraryFrame: new(bool)}).IsLibraryFrame()) - libFrame := true - assert.True(t, (&StacktraceFrame{LibraryFrame: &libFrame}).IsLibraryFrame()) -} - -func TestIsSourcemapApplied(t *testing.T) { - assert.False(t, (&StacktraceFrame{}).IsSourcemapApplied()) - - fr := StacktraceFrame{SourcemapUpdated: new(bool)} - assert.False(t, fr.IsSourcemapApplied()) - - libFrame := true - fr = StacktraceFrame{SourcemapUpdated: &libFrame} - assert.True(t, fr.IsSourcemapApplied()) -} - -func TestExcludeFromGroupingKey(t *testing.T) { - tests := []struct { - fr StacktraceFrame - pattern string - exclude bool - }{ - { - fr: StacktraceFrame{}, - pattern: "", - exclude: false, - }, - { - fr: StacktraceFrame{Filename: "/webpack"}, - pattern: "", - exclude: false, - }, - { - fr: StacktraceFrame{Filename: "/webpack"}, - pattern: "/webpack/tmp", - exclude: false, - }, - { - fr: StacktraceFrame{Filename: ""}, - pattern: "^/webpack", - exclude: false, - }, - { - fr: StacktraceFrame{Filename: "/webpack"}, - pattern: "^/webpack", - exclude: true, - }, - { - fr: StacktraceFrame{Filename: "/webpack/test/e2e/general-usecase/app.e2e-bundle.js"}, - pattern: "^/webpack", - exclude: true, - }, - { - fr: StacktraceFrame{Filename: "/filename"}, - pattern: "^/webpack", - exclude: false, - }, - { - fr: StacktraceFrame{Filename: "/filename/a"}, - pattern: "^/webpack", - exclude: false, - }, - { - fr: StacktraceFrame{Filename: "webpack"}, - pattern: "^/webpack", - exclude: false, - }, - } - - for idx, test := range tests { - var excludePattern *regexp.Regexp - if test.pattern != "" { - excludePattern = regexp.MustCompile(test.pattern) - } - - out := test.fr.transform(&transform.Config{ - RUM: transform.RUMConfig{ExcludeFromGrouping: excludePattern}, - }, true) - exclude := out["exclude_from_grouping"] - assert.Equal(t, test.exclude, exclude, - fmt.Sprintf("(%v): Pattern: %v, Filename: %v, expected to be excluded: %v", idx, test.pattern, test.fr.Filename, test.exclude)) - } -} - -func TestLibraryFrame(t *testing.T) { - - truthy := true - falsy := false - path := "/~/a/b" - tests := []struct { - fr StacktraceFrame - libraryPattern *regexp.Regexp - libraryFrame *bool - origLibraryFrame *bool - msg string - }{ - {fr: StacktraceFrame{}, - libraryFrame: nil, - origLibraryFrame: nil, - msg: "Empty StacktraceFrame, empty config"}, - {fr: StacktraceFrame{AbsPath: path}, - libraryFrame: nil, - origLibraryFrame: nil, - msg: "No pattern"}, - {fr: StacktraceFrame{AbsPath: path}, - libraryPattern: regexp.MustCompile(""), - libraryFrame: &truthy, - origLibraryFrame: nil, - msg: "Empty pattern"}, - {fr: StacktraceFrame{LibraryFrame: &falsy}, - libraryPattern: regexp.MustCompile("~"), - libraryFrame: &falsy, - origLibraryFrame: &falsy, - msg: "Empty StacktraceFrame"}, - {fr: StacktraceFrame{AbsPath: path, LibraryFrame: &truthy}, - libraryPattern: regexp.MustCompile("^~/"), - libraryFrame: &falsy, - origLibraryFrame: &truthy, - msg: "AbsPath given, no Match"}, - {fr: StacktraceFrame{Filename: "myFile.js", LibraryFrame: &truthy}, - libraryPattern: regexp.MustCompile("^~/"), - libraryFrame: &falsy, - origLibraryFrame: &truthy, - msg: "Filename given, no Match"}, - {fr: StacktraceFrame{AbsPath: path, Filename: "myFile.js"}, - libraryPattern: regexp.MustCompile("^~/"), - libraryFrame: &falsy, - origLibraryFrame: nil, - msg: "AbsPath and Filename given, no Match"}, - {fr: StacktraceFrame{Filename: "/tmp"}, - libraryPattern: regexp.MustCompile("/tmp"), - libraryFrame: &truthy, - origLibraryFrame: nil, - msg: "Filename matching"}, - {fr: StacktraceFrame{AbsPath: path, LibraryFrame: &falsy}, - libraryPattern: regexp.MustCompile("~/"), - libraryFrame: &truthy, - origLibraryFrame: &falsy, - msg: "AbsPath matching"}, - {fr: StacktraceFrame{AbsPath: path, Filename: "/a/b/c"}, - libraryPattern: regexp.MustCompile("~/"), - libraryFrame: &truthy, - origLibraryFrame: nil, - msg: "AbsPath matching, Filename not matching"}, - {fr: StacktraceFrame{AbsPath: path, Filename: "/a/b/c"}, - libraryPattern: regexp.MustCompile("/a/b/c"), - libraryFrame: &truthy, - origLibraryFrame: nil, - msg: "AbsPath not matching, Filename matching"}, - {fr: StacktraceFrame{AbsPath: path, Filename: "~/a/b/c"}, - libraryPattern: regexp.MustCompile("~/"), - libraryFrame: &truthy, - origLibraryFrame: nil, - msg: "AbsPath and Filename matching"}, - } - - for _, test := range tests { - cfg := transform.Config{ - RUM: transform.RUMConfig{ - LibraryPattern: test.libraryPattern, - }, - } - out := test.fr.transform(&cfg, true)["library_frame"] - libFrame := test.fr.LibraryFrame - origLibFrame := test.fr.Original.LibraryFrame - if test.libraryFrame == nil { - assert.Nil(t, out, test.msg) - assert.Nil(t, libFrame, test.msg) - } else { - assert.Equal(t, *test.libraryFrame, out, test.msg) - assert.Equal(t, *test.libraryFrame, *libFrame, test.msg) - } - if test.origLibraryFrame == nil { - assert.Nil(t, origLibFrame, test.msg) - } else { - assert.Equal(t, *test.origLibraryFrame, *origLibFrame, test.msg) - } - } -} - -func testSourcemapStore(t *testing.T, client elasticsearch.Client) *sourcemap.Store { - store, err := sourcemap.NewElasticsearchStore(client, "apm-*sourcemap*", time.Minute) - require.NoError(t, err) - return store -} diff --git a/model/stacktrace_test.go b/model/stacktrace_test.go index 24cf0b9073d..f7e701e253e 100644 --- a/model/stacktrace_test.go +++ b/model/stacktrace_test.go @@ -20,23 +20,37 @@ package model import ( "context" "fmt" + "regexp" "testing" "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/apm-server/sourcemap/test" "github.com/elastic/apm-server/transform" ) func TestStacktraceTransform(t *testing.T) { - colno := 1 - l4, l5, l6, l8 := 4, 5, 6, 8 - fct := "original function" - origFilename, webpackFilename := "original filename", "/webpack" - absPath, serviceName := "original path", "service1" - service := Service{Name: serviceName} + originalLineno := 111 + originalColno := 222 + originalFunction := "original function" + originalFilename := "original filename" + originalModule := "original module" + originalClassname := "original classname" + originalAbsPath := "original path" + + mappedLineno := 333 + mappedColno := 444 + mappedFunction := "mapped function" + mappedFilename := "mapped filename" + mappedClassname := "mapped classname" + mappedAbsPath := "mapped path" + + vars := common.MapStr{"a": "abc", "b": 123} + + contextLine := "context line" + preContext := []string{"before1", "before2"} + postContext := []string{"after1", "after2"} tests := []struct { Stacktrace Stacktrace @@ -54,214 +68,263 @@ func TestStacktraceTransform(t *testing.T) { Msg: "Stacktrace with empty Frame", }, { - Stacktrace: Stacktrace{ - &StacktraceFrame{ - Colno: &colno, - Lineno: &l4, - Filename: origFilename, - Function: fct, - AbsPath: absPath, - }, - &StacktraceFrame{Colno: &colno, Lineno: &l6, Function: fct, AbsPath: absPath}, - &StacktraceFrame{Colno: &colno, Lineno: &l8, Function: fct, AbsPath: absPath}, - &StacktraceFrame{ - Colno: &colno, - Lineno: &l5, - Filename: origFilename, - Function: fct, - AbsPath: absPath, + Stacktrace: Stacktrace{{ + Colno: &originalColno, + Lineno: &originalLineno, + Filename: originalFilename, + Function: originalFunction, + Classname: originalClassname, + Module: originalModule, + AbsPath: originalAbsPath, + LibraryFrame: newBool(true), + Vars: vars, + }}, + Output: []common.MapStr{{ + "abs_path": "original path", + "filename": "original filename", + "function": "original function", + "classname": "original classname", + "module": "original module", + "line": common.MapStr{ + "number": 111, + "column": 222, }, - &StacktraceFrame{ - Colno: &colno, - Lineno: &l4, - Filename: webpackFilename, - AbsPath: absPath, - }, - }, - Output: []common.MapStr{ - { - "abs_path": "original path", "filename": "original filename", "function": "original function", - "line": common.MapStr{"column": 1, "number": 4}, - "exclude_from_grouping": false, + "exclude_from_grouping": false, + "library_frame": true, + "vars": vars, + }}, + Msg: "unmapped stacktrace", + }, + { + Stacktrace: Stacktrace{{ + Colno: &mappedColno, + Lineno: &mappedLineno, + Filename: mappedFilename, + Function: mappedFunction, + Classname: mappedClassname, + AbsPath: mappedAbsPath, + Original: Original{ + Colno: &originalColno, + Lineno: &originalLineno, + Filename: originalFilename, + Function: originalFunction, + Classname: originalClassname, + AbsPath: originalAbsPath, }, - { - "abs_path": "original path", "function": "original function", - "line": common.MapStr{"column": 1, "number": 6}, - "exclude_from_grouping": false, + ExcludeFromGrouping: true, + SourcemapUpdated: true, + ContextLine: contextLine, + PreContext: preContext, + PostContext: postContext, + }}, + Output: []common.MapStr{{ + "abs_path": "mapped path", + "filename": "mapped filename", + "function": "mapped function", + "classname": "mapped classname", + "line": common.MapStr{ + "number": 333, + "column": 444, + "context": "context line", }, - { - "abs_path": "original path", "function": "original function", - "line": common.MapStr{"column": 1, "number": 8}, - "exclude_from_grouping": false, + "context": common.MapStr{ + "pre": preContext, + "post": postContext, }, - { - "abs_path": "original path", "filename": "original filename", "function": "original function", - "line": common.MapStr{"column": 1, "number": 5}, - "exclude_from_grouping": false, + "original": common.MapStr{ + "abs_path": "original path", + "filename": "original filename", + "function": "original function", + "classname": "original classname", + "lineno": 111, + "colno": 222, }, - { - "abs_path": "original path", "filename": "/webpack", - "line": common.MapStr{"column": 1, "number": 4}, - "exclude_from_grouping": false, + "exclude_from_grouping": true, + "sourcemap": common.MapStr{ + "updated": true, }, - }, - Msg: "Stacktrace with sourcemapping", + }}, + Msg: "mapped stacktrace", }, } for idx, test := range tests { - output := test.Stacktrace.transform(context.Background(), &transform.Config{}, false, &service) + output := test.Stacktrace.transform(context.Background(), &transform.Config{}, false) assert.Equal(t, test.Output, output, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) } } -func TestStacktraceTransformWithSourcemapping(t *testing.T) { - int1, int6, int7, int67 := 1, 6, 7, 67 - fct1, fct2 := "function foo", "function bar" - absPath, serviceName, serviceVersion := "/../a/c", "service1", "2.4.1" - origFilename, appliedFilename := "original filename", "myfilename" - service := Service{Name: serviceName, Version: serviceVersion} +func TestStacktraceFrameIsLibraryFrame(t *testing.T) { + assert.False(t, (&StacktraceFrame{}).IsLibraryFrame()) + assert.False(t, (&StacktraceFrame{LibraryFrame: new(bool)}).IsLibraryFrame()) + libFrame := true + assert.True(t, (&StacktraceFrame{LibraryFrame: &libFrame}).IsLibraryFrame()) +} - for name, tc := range map[string]struct { - Stacktrace Stacktrace - Output []common.MapStr - Msg string +func TestStacktraceFrameExcludeFromGroupingKey(t *testing.T) { + tests := []struct { + fr StacktraceFrame + pattern string + exclude bool }{ - "emptyStacktrace": { - Stacktrace: Stacktrace{}, - Output: nil, + { + fr: StacktraceFrame{}, + pattern: "", + exclude: false, }, - "emptyFrame": { - Stacktrace: Stacktrace{&StacktraceFrame{}}, - Output: []common.MapStr{ - {"exclude_from_grouping": false, - "sourcemap": common.MapStr{ - "error": "Colno mandatory for sourcemapping.", - "updated": false, - }, - }, - }, + { + fr: StacktraceFrame{Filename: "/webpack"}, + pattern: "", + exclude: false, }, - "noLineno": { - Stacktrace: Stacktrace{&StacktraceFrame{Colno: &int1}}, - Output: []common.MapStr{ - {"line": common.MapStr{"column": 1}, - "exclude_from_grouping": false, - "sourcemap": common.MapStr{ - "error": "Lineno mandatory for sourcemapping.", - "updated": false, - }, - }, - }, + { + fr: StacktraceFrame{Filename: "/webpack"}, + pattern: "/webpack/tmp", + exclude: false, }, - "sourcemapApplied": { - Stacktrace: Stacktrace{ - &StacktraceFrame{ - Colno: &int7, - Lineno: &int1, - Filename: origFilename, - Function: fct1, - AbsPath: absPath, - }, - &StacktraceFrame{ - Colno: &int67, - Lineno: &int1, - Filename: appliedFilename, - Function: fct2, - AbsPath: absPath, - }, - &StacktraceFrame{ - Colno: &int7, - Lineno: &int1, - Filename: appliedFilename, - Function: fct2, - AbsPath: absPath, - }, - &StacktraceFrame{Colno: &int1, Lineno: &int6, Function: fct2, AbsPath: absPath}, - }, - Output: []common.MapStr{ - { - "abs_path": "/a/c", - "filename": "webpack:///bundle.js", - "function": "exports", - "context": common.MapStr{ - "post": []string{"/******/ \t// The module cache", "/******/ \tvar installedModules = {};", "/******/", "/******/ \t// The require function", "/******/ \tfunction __webpack_require__(moduleId) {"}}, - "line": common.MapStr{ - "column": 9, - "number": 1, - "context": "/******/ (function(modules) { // webpackBootstrap"}, - "exclude_from_grouping": false, - "sourcemap": common.MapStr{"updated": true}, - "original": common.MapStr{ - "abs_path": "/../a/c", - "colno": 7, - "filename": "original filename", - "function": "function foo", - "lineno": 1, - }, - }, - { - "abs_path": "/a/c", - "filename": "myfilename", - "function": "", //prev function - "context": common.MapStr{ - "post": []string{" \t\t\tid: moduleId,", " \t\t\tloaded: false", " \t\t};", "", " \t\t// Execute the module function"}, - "pre": []string{" \t\tif(installedModules[moduleId])", " \t\t\treturn installedModules[moduleId].exports;", "", " \t\t// Create a new module (and put it into the cache)", " \t\tvar module = installedModules[moduleId] = {"}}, - "line": common.MapStr{ - "column": 0, - "number": 13, - "context": " \t\t\texports: {},"}, - "exclude_from_grouping": false, - "sourcemap": common.MapStr{"updated": true}, - "original": common.MapStr{ - "abs_path": "/../a/c", - "colno": 67, - "filename": "myfilename", - "function": "function bar", - "lineno": 1, - }, - }, - { - "abs_path": "/a/c", - "filename": "webpack:///bundle.js", - "function": "", //prev function - "context": common.MapStr{ - "post": []string{"/******/ \t// The module cache", "/******/ \tvar installedModules = {};", "/******/", "/******/ \t// The require function", "/******/ \tfunction __webpack_require__(moduleId) {"}}, - "line": common.MapStr{ - "column": 9, - "number": 1, - "context": "/******/ (function(modules) { // webpackBootstrap"}, - "exclude_from_grouping": false, - "sourcemap": common.MapStr{"updated": true}, - "original": common.MapStr{ - "abs_path": "/../a/c", - "colno": 7, - "filename": "myfilename", - "function": "function bar", - "lineno": 1, - }, - }, - { - "abs_path": "/../a/c", - "function": fct2, - "line": common.MapStr{"column": 1, "number": 6}, - "exclude_from_grouping": false, - "sourcemap": common.MapStr{"updated": false, "error": "No Sourcemap found for Lineno 6, Colno 1"}, - }, - }, + { + fr: StacktraceFrame{Filename: ""}, + pattern: "^/webpack", + exclude: false, }, - } { - t.Run(name, func(t *testing.T) { - cfg := &transform.Config{ - RUM: transform.RUMConfig{ - SourcemapStore: testSourcemapStore(t, test.ESClientWithValidSourcemap(t)), - }, - } + { + fr: StacktraceFrame{Filename: "/webpack"}, + pattern: "^/webpack", + exclude: true, + }, + { + fr: StacktraceFrame{Filename: "/webpack/test/e2e/general-usecase/app.e2e-bundle.js"}, + pattern: "^/webpack", + exclude: true, + }, + { + fr: StacktraceFrame{Filename: "/filename"}, + pattern: "^/webpack", + exclude: false, + }, + { + fr: StacktraceFrame{Filename: "/filename/a"}, + pattern: "^/webpack", + exclude: false, + }, + { + fr: StacktraceFrame{Filename: "webpack"}, + pattern: "^/webpack", + exclude: false, + }, + } + + for idx, test := range tests { + var excludePattern *regexp.Regexp + if test.pattern != "" { + excludePattern = regexp.MustCompile(test.pattern) + } + + out := test.fr.transform(&transform.Config{ + RUM: transform.RUMConfig{ExcludeFromGrouping: excludePattern}, + }, true) + exclude := out["exclude_from_grouping"] + message := fmt.Sprintf( + "(%v): Pattern: %v, Filename: %v, expected to be excluded: %v", + idx, test.pattern, test.fr.Filename, test.exclude, + ) + assert.Equal(t, test.exclude, exclude, message) + } +} - // run `Stacktrace.transform` twice to ensure method is idempotent - tc.Stacktrace.transform(context.Background(), cfg, true, &service) - output := tc.Stacktrace.transform(context.Background(), cfg, true, &service) - assert.Equal(t, tc.Output, output) - }) +func TestStacktraceFrameLibraryFrame(t *testing.T) { + path := "/~/a/b" + tests := []struct { + fr StacktraceFrame + libraryPattern *regexp.Regexp + libraryFrame *bool + origLibraryFrame *bool + msg string + }{ + {fr: StacktraceFrame{}, + libraryFrame: nil, + origLibraryFrame: nil, + msg: "Empty StacktraceFrame, empty config"}, + {fr: StacktraceFrame{AbsPath: path}, + libraryFrame: nil, + origLibraryFrame: nil, + msg: "No pattern"}, + {fr: StacktraceFrame{AbsPath: path}, + libraryPattern: regexp.MustCompile(""), + libraryFrame: newBool(true), + origLibraryFrame: nil, + msg: "Empty pattern"}, + {fr: StacktraceFrame{LibraryFrame: newBool(false)}, + libraryPattern: regexp.MustCompile("~"), + libraryFrame: newBool(false), + origLibraryFrame: newBool(false), + msg: "Empty StacktraceFrame"}, + {fr: StacktraceFrame{AbsPath: path, LibraryFrame: newBool(true)}, + libraryPattern: regexp.MustCompile("^~/"), + libraryFrame: newBool(false), + origLibraryFrame: newBool(true), + msg: "AbsPath given, no Match"}, + {fr: StacktraceFrame{Filename: "myFile.js", LibraryFrame: newBool(true)}, + libraryPattern: regexp.MustCompile("^~/"), + libraryFrame: newBool(false), + origLibraryFrame: newBool(true), + msg: "Filename given, no Match"}, + {fr: StacktraceFrame{AbsPath: path, Filename: "myFile.js"}, + libraryPattern: regexp.MustCompile("^~/"), + libraryFrame: newBool(false), + origLibraryFrame: nil, + msg: "AbsPath and Filename given, no Match"}, + {fr: StacktraceFrame{Filename: "/tmp"}, + libraryPattern: regexp.MustCompile("/tmp"), + libraryFrame: newBool(true), + origLibraryFrame: nil, + msg: "Filename matching"}, + {fr: StacktraceFrame{AbsPath: path, LibraryFrame: newBool(false)}, + libraryPattern: regexp.MustCompile("~/"), + libraryFrame: newBool(true), + origLibraryFrame: newBool(false), + msg: "AbsPath matching"}, + {fr: StacktraceFrame{AbsPath: path, Filename: "/a/b/c"}, + libraryPattern: regexp.MustCompile("~/"), + libraryFrame: newBool(true), + origLibraryFrame: nil, + msg: "AbsPath matching, Filename not matching"}, + {fr: StacktraceFrame{AbsPath: path, Filename: "/a/b/c"}, + libraryPattern: regexp.MustCompile("/a/b/c"), + libraryFrame: newBool(true), + origLibraryFrame: nil, + msg: "AbsPath not matching, Filename matching"}, + {fr: StacktraceFrame{AbsPath: path, Filename: "~/a/b/c"}, + libraryPattern: regexp.MustCompile("~/"), + libraryFrame: newBool(true), + origLibraryFrame: nil, + msg: "AbsPath and Filename matching"}, } + + for _, test := range tests { + cfg := transform.Config{ + RUM: transform.RUMConfig{ + LibraryPattern: test.libraryPattern, + }, + } + out := test.fr.transform(&cfg, true)["library_frame"] + libFrame := test.fr.LibraryFrame + origLibFrame := test.fr.Original.LibraryFrame + if test.libraryFrame == nil { + assert.Nil(t, out, test.msg) + assert.Nil(t, libFrame, test.msg) + } else { + assert.Equal(t, *test.libraryFrame, out, test.msg) + assert.Equal(t, *test.libraryFrame, *libFrame, test.msg) + } + if test.origLibraryFrame == nil { + assert.Nil(t, origLibFrame, test.msg) + } else { + assert.Equal(t, *test.origLibraryFrame, *origLibFrame, test.msg) + } + } +} + +func newBool(v bool) *bool { + return &v } diff --git a/sourcemap/processor.go b/sourcemap/processor.go new file mode 100644 index 00000000000..cd904ee5649 --- /dev/null +++ b/sourcemap/processor.go @@ -0,0 +1,171 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sourcemap + +import ( + "context" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/utility" +) + +// BatchProcessor is a model.BatchProcessor that performs source +// mapping for span and error events. +type BatchProcessor struct { + Store *Store +} + +// ProcessBatch processes spans and errors, applying source maps +// to their stack traces. +func (p BatchProcessor) ProcessBatch(ctx context.Context, batch *model.Batch) error { + for _, event := range *batch { + switch { + case event.Span != nil: + if err := p.processSpan(ctx, event.Span); err != nil { + return err + } + case event.Error != nil: + if err := p.processError(ctx, event.Error); err != nil { + return err + } + } + } + return nil +} + +func (p BatchProcessor) processSpan(ctx context.Context, event *model.Span) error { + if event.Metadata.Service.Name == "" || event.Metadata.Service.Version == "" { + return nil + } + if err := p.processStacktraceFrames(ctx, &event.Metadata.Service, event.Stacktrace...); err != nil { + return err + } + return nil +} + +func (p BatchProcessor) processError(ctx context.Context, event *model.Error) error { + if event.Metadata.Service.Name == "" || event.Metadata.Service.Version == "" { + return nil + } + if event.Log != nil { + if err := p.processStacktraceFrames( + ctx, &event.Metadata.Service, + event.Log.Stacktrace..., + ); err != nil { + return err + } + } + if event.Exception != nil { + if err := p.processException(ctx, &event.Metadata.Service, event.Exception); err != nil { + return err + } + } + return nil +} + +func (p BatchProcessor) processException(ctx context.Context, service *model.Service, exception *model.Exception) error { + if err := p.processStacktraceFrames(ctx, service, exception.Stacktrace...); err != nil { + return err + } + for _, cause := range exception.Cause { + if err := p.processStacktraceFrames(ctx, service, cause.Stacktrace...); err != nil { + return err + } + } + return nil +} + +// source map algorithm: +// +// apply source mapping frame by frame +// if no source map could be found, set updated to false and set sourcemap error +// otherwise use source map library for mapping and update +// - filename: only if it was found +// - function: +// * should be moved down one stack trace frame, +// * the function name of the first frame is set to +// * if one frame is not found in the source map, this frame is left out and +// the function name from the previous frame is used +// * if a mapping could be applied but no function name is found, the +// function name for the next frame is set to +// - colno +// - lineno +// - abs_path is set to the cleaned abs_path +// - sourcemap.updated is set to true +func (p BatchProcessor) processStacktraceFrames(ctx context.Context, service *model.Service, frames ...*model.StacktraceFrame) error { + prevFunction := "" + for i := len(frames) - 1; i >= 0; i-- { + frame := frames[i] + mapped, function, err := p.processStacktraceFrame(ctx, service, frame, prevFunction) + if err != nil { + return err + } + if mapped { + prevFunction = function + } + } + return nil +} + +func (p BatchProcessor) processStacktraceFrame( + ctx context.Context, + service *model.Service, + frame *model.StacktraceFrame, + prevFunction string, +) (bool, string, error) { + if frame.Colno == nil || frame.Lineno == nil || frame.AbsPath == "" { + return false, "", nil + } + + path := utility.CleanUrlPath(frame.AbsPath) + mapper, err := p.Store.Fetch(ctx, service.Name, service.Version, path) + if err != nil { + return false, "", err + } + if mapper == nil { + return false, "", nil + } + file, function, lineno, colno, ctxLine, preCtx, postCtx, ok := Map(mapper, *frame.Lineno, *frame.Colno) + if !ok { + return false, "", nil + } + + // Store original source information. + frame.Original.Colno = frame.Colno + frame.Original.AbsPath = frame.AbsPath + frame.Original.Function = frame.Function + frame.Original.Lineno = frame.Lineno + frame.Original.Filename = frame.Filename + frame.Original.Classname = frame.Classname + + if file != "" { + frame.Filename = file + } + frame.Colno = &colno + frame.Lineno = &lineno + frame.AbsPath = path + frame.SourcemapUpdated = true + frame.Function = prevFunction + frame.ContextLine = ctxLine + frame.PreContext = preCtx + frame.PostContext = postCtx + if function == "" { + function = "" + } + return true, function, nil +} diff --git a/sourcemap/processor_test.go b/sourcemap/processor_test.go new file mode 100644 index 00000000000..c9201cce0f5 --- /dev/null +++ b/sourcemap/processor_test.go @@ -0,0 +1,222 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sourcemap_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/sourcemap" + "github.com/elastic/apm-server/sourcemap/test" +) + +func TestBatchProcessor(t *testing.T) { + client := test.ESClientWithValidSourcemap(t) + store, err := sourcemap.NewElasticsearchStore(client, "index", time.Minute) + require.NoError(t, err) + + originalLinenoWithFilename := 1 + originalColnoWithFilename := 7 + originalLinenoWithoutFilename := 1 + originalColnoWithoutFilename := 23 + originalLinenoWithFunction := 1 + originalColnoWithFunction := 67 + + metadata := model.Metadata{ + Service: model.Service{ + Name: "service_name", + Version: "service_version", + }, + } + nonMatchingFrame := model.StacktraceFrame{ + AbsPath: "bundle.js", + Lineno: newInt(0), + Colno: newInt(0), + Function: "original function", + } + mappedFrameWithFilename := model.StacktraceFrame{ + AbsPath: "bundle.js", + Function: "", + Filename: "webpack:///bundle.js", + Lineno: newInt(1), + Colno: newInt(9), + ContextLine: "/******/ (function(modules) { // webpackBootstrap", + PostContext: []string{ + "/******/ \t// The module cache", + "/******/ \tvar installedModules = {};", + "/******/", + "/******/ \t// The require function", + "/******/ \tfunction __webpack_require__(moduleId) {", + }, + Original: model.Original{ + AbsPath: "bundle.js", + Lineno: &originalLinenoWithFilename, + Colno: &originalColnoWithFilename, + Function: "original function", + }, + SourcemapUpdated: true, + } + + mappedFrameWithoutFilename := mappedFrameWithFilename + mappedFrameWithoutFilename.Original.Lineno = &originalLinenoWithoutFilename + mappedFrameWithoutFilename.Original.Colno = &originalColnoWithoutFilename + mappedFrameWithoutFilename.Lineno = newInt(5) + mappedFrameWithoutFilename.Colno = newInt(0) + mappedFrameWithoutFilename.Filename = "" + mappedFrameWithoutFilename.ContextLine = " \tfunction __webpack_require__(moduleId) {" + mappedFrameWithoutFilename.PreContext = []string{ + " \t// The module cache", + " \tvar installedModules = {};", + "", + " \t// The require function", + } + mappedFrameWithoutFilename.PostContext = []string{ + "", + " \t\t// Check if module is in cache", + " \t\tif(installedModules[moduleId])", + " \t\t\treturn installedModules[moduleId].exports;", + "", + } + + mappedFrameWithFunction := mappedFrameWithoutFilename + mappedFrameWithFunction.Original.Lineno = &originalLinenoWithFunction + mappedFrameWithFunction.Original.Colno = &originalColnoWithFunction + mappedFrameWithFunction.Lineno = newInt(13) + mappedFrameWithFunction.Colno = newInt(0) + mappedFrameWithFunction.ContextLine = " \t\t\texports: {}," + mappedFrameWithFunction.PreContext = []string{ + " \t\tif(installedModules[moduleId])", + " \t\t\treturn installedModules[moduleId].exports;", + "", + " \t\t// Create a new module (and put it into the cache)", + " \t\tvar module = installedModules[moduleId] = {", + } + mappedFrameWithFunction.PostContext = []string{ + " \t\t\tid: moduleId,", + " \t\t\tloaded: false", + " \t\t};", + "", + " \t\t// Execute the module function", + } + mappedFrameWithFunction2 := mappedFrameWithFunction + mappedFrameWithFunction2.Function = "exports" + + transaction := &model.Transaction{} + span1 := &model.Span{} // intentionally left blank + error1 := &model.Error{Metadata: metadata} + span2 := &model.Span{ + Metadata: metadata, + Stacktrace: model.Stacktrace{cloneFrame(nonMatchingFrame), { + AbsPath: "bundle.js", + Lineno: newInt(originalLinenoWithFilename), + Colno: newInt(originalColnoWithFilename), + Function: "original function", + }}, + } + error2 := &model.Error{ + Metadata: metadata, + Log: &model.Log{ + Stacktrace: model.Stacktrace{{ + AbsPath: "bundle.js", + Lineno: newInt(originalLinenoWithoutFilename), + Colno: newInt(originalColnoWithoutFilename), + Function: "original function", + }}, + }, + } + error3 := &model.Error{ + Metadata: metadata, + Exception: &model.Exception{ + Stacktrace: model.Stacktrace{{ + AbsPath: "bundle.js", + Lineno: newInt(originalLinenoWithFunction), + Colno: newInt(originalColnoWithFunction), + Function: "original function", + }}, + Cause: []model.Exception{{ + Stacktrace: model.Stacktrace{{ + AbsPath: "bundle.js", + Lineno: newInt(originalLinenoWithFunction), + Colno: newInt(originalColnoWithFunction), + Function: "original function", + }, { + AbsPath: "bundle.js", + Lineno: newInt(originalLinenoWithFunction), + Colno: newInt(originalColnoWithFunction), + Function: "original function", + }}, + }}, + }, + } + + processor := sourcemap.BatchProcessor{Store: store} + err = processor.ProcessBatch(context.Background(), &model.Batch{ + {Transaction: transaction}, + {Span: span1}, + {Span: span2}, + {Error: error1}, + {Error: error2}, + {Error: error3}, + }) + assert.NoError(t, err) + + assert.Equal(t, &model.Span{}, span1) + assert.Equal(t, &model.Error{Metadata: metadata}, error1) + assert.Equal(t, &model.Span{ + Metadata: metadata, + Stacktrace: model.Stacktrace{ + cloneFrame(nonMatchingFrame), + cloneFrame(mappedFrameWithFilename), + }, + }, span2) + assert.Equal(t, &model.Error{ + Metadata: metadata, + Log: &model.Log{ + Stacktrace: model.Stacktrace{ + cloneFrame(mappedFrameWithoutFilename), + }, + }, + }, error2) + assert.Equal(t, &model.Error{ + Metadata: metadata, + Exception: &model.Exception{ + Stacktrace: model.Stacktrace{ + cloneFrame(mappedFrameWithFunction), + }, + Cause: []model.Exception{{ + Stacktrace: model.Stacktrace{ + cloneFrame(mappedFrameWithFunction2), + cloneFrame(mappedFrameWithFunction), + }, + }}, + }, + }, error3) +} + +func cloneFrame(frame model.StacktraceFrame) *model.StacktraceFrame { + return &frame +} + +func newInt(v int) *int { + return &v +} diff --git a/sourcemap/store.go b/sourcemap/store.go index aec47b54833..f7cdc61c74f 100644 --- a/sourcemap/store.go +++ b/sourcemap/store.go @@ -138,8 +138,9 @@ func (s *Store) Fetch(ctx context.Context, name, version, path string) (*sourcem return consumer, nil } -// Added ensures the internal cache is cleared for the given parameters. This should be called when a sourcemap is uploaded. -func (s *Store) Added(ctx context.Context, name string, version string, path string) { +// NotifyAdded ensures the internal cache is cleared for the given parameters. +// This should be called when a sourcemap is uploaded. +func (s *Store) NotifyAdded(ctx context.Context, name string, version string, path string) { if sourcemap, err := s.Fetch(ctx, name, version, path); err == nil && sourcemap != nil { s.logger.Warnf("Overriding sourcemap for service %s version %s and file %s", name, version, path) diff --git a/sourcemap/store_test.go b/sourcemap/store_test.go index 2ee44cd4480..dc84f319d95 100644 --- a/sourcemap/store_test.go +++ b/sourcemap/store_test.go @@ -308,7 +308,7 @@ func TestStore_Added(t *testing.T) { assert.Equal(t, "", mapper.File()) // remove from cache, afterwards sourcemap should be fetched from ES - store.Added(context.Background(), name, version, path) + store.NotifyAdded(context.Background(), name, version, path) mapper, err = store.Fetch(context.Background(), name, version, path) require.NoError(t, err) assert.NotNil(t, &sourcemap.Consumer{}, mapper) diff --git a/systemtest/approvals/TestNoMatchingSourcemap.approved.json b/systemtest/approvals/TestNoMatchingSourcemap.approved.json index 58cd33d382d..9232321713c 100644 --- a/systemtest/approvals/TestNoMatchingSourcemap.approved.json +++ b/systemtest/approvals/TestNoMatchingSourcemap.approved.json @@ -54,10 +54,6 @@ "line": { "column": 18, "number": 1 - }, - "sourcemap": { - "error": "No Sourcemap available for ServiceName apm-agent-js, ServiceVersion 1.0.0, Path http://subdomain1.localhost:8000/test/e2e/general-usecase/bundle.js.map.", - "updated": false } }, { @@ -69,10 +65,6 @@ "line": { "column": 18, "number": 1 - }, - "sourcemap": { - "error": "No Sourcemap available for ServiceName apm-agent-js, ServiceVersion 1.0.0, Path http://subdomain2.localhost:8000/test/e2e/general-usecase/bundle.js.map.", - "updated": false } } ], diff --git a/systemtest/sourcemap_test.go b/systemtest/sourcemap_test.go index 26ec5fa7091..6cf2954a21f 100644 --- a/systemtest/sourcemap_test.go +++ b/systemtest/sourcemap_test.go @@ -25,9 +25,7 @@ import ( "net/http" "os" "path/filepath" - "strings" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -105,7 +103,15 @@ func TestDuplicateSourcemapWarning(t *testing.T) { ) systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm-*-sourcemap", nil) - expectLogMessage(t, srv, "Overriding sourcemap") + require.NoError(t, srv.Close()) + var messages []string + for _, entry := range srv.Logs.All() { + messages = append(messages, entry.Message) + } + assert.Contains(t, messages, + `Overriding sourcemap for service apm-agent-js version 1.0.0 and `+ + `file http://localhost:8000/test/e2e/general-usecase/bundle.js.map`, + ) } func TestNoMatchingSourcemap(t *testing.T) { @@ -149,8 +155,8 @@ func TestFetchLatestSourcemap(t *testing.T) { systemtest.Elasticsearch.ExpectDocs(t, "apm-*-sourcemap", nil) systemtest.SendRUMEventsPayload(t, srv, "../testdata/intake-v2/errors_rum.ndjson") - systemtest.Elasticsearch.ExpectDocs(t, "apm-*-error", nil) - expectLogMessage(t, srv, "No Sourcemap available") + result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*-error", nil) + assertSourcemapUpdated(t, result, false) deleteIndex(t, "apm-*-error*") // upload second sourcemap file with same key, @@ -166,18 +172,17 @@ func TestFetchLatestSourcemap(t *testing.T) { systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm-*-sourcemap", nil) systemtest.SendRUMEventsPayload(t, srv, "../testdata/intake-v2/errors_rum.ndjson") - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*-error", nil) - expectSourcemapUpdated(t, result) + result = systemtest.Elasticsearch.ExpectDocs(t, "apm-*-error", nil) + assertSourcemapUpdated(t, result, true) } -func TestSourcemapCacheUsage(t *testing.T) { +func TestSourcemapCaching(t *testing.T) { systemtest.CleanupElasticsearch(t) srv := apmservertest.NewUnstartedServer(t) srv.Config.RUM = &apmservertest.RUMConfig{Enabled: true} err := srv.Start() require.NoError(t, err) - // upload valid sourcemap uploadSourcemap(t, srv, "../testdata/sourcemap/bundle.js.map", "http://localhost:8000/test/e2e/general-usecase/bundle.js.map", "apm-agent-js", @@ -185,55 +190,17 @@ func TestSourcemapCacheUsage(t *testing.T) { ) systemtest.Elasticsearch.ExpectDocs(t, "apm-*-sourcemap", nil) - // trigger cache + // Index an error, applying source mapping and caching the source map in the process. systemtest.SendRUMEventsPayload(t, srv, "../testdata/intake-v2/errors_rum.ndjson") result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*-error", nil) - expectSourcemapUpdated(t, result) + assertSourcemapUpdated(t, result, true) - // delete sourcemap index + // Delete the source map and error, and try again. deleteIndex(t, "apm-*-sourcemap*") - - // index error document again - systemtest.SendRUMEventsPayload(t, srv, "../testdata/intake-v2/errors_rum.ndjson") - result = systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm-*-error", nil) - - // check that sourcemap is updated, meaning it was applied from the cache - expectSourcemapUpdated(t, result) -} - -func TestSourcemapCacheExpiration(t *testing.T) { - systemtest.CleanupElasticsearch(t) - srv := apmservertest.NewUnstartedServer(t) - srv.Config.RUM = &apmservertest.RUMConfig{Enabled: true, - Sourcemap: &apmservertest.RUMSourcemapConfig{ - Enabled: true, - Cache: &apmservertest.RUMSourcemapCacheConfig{Expiration: time.Second}, - }} - err := srv.Start() - require.NoError(t, err) - - // upload valid sourcemap - uploadSourcemap(t, srv, "../testdata/sourcemap/bundle.js.map", - "http://localhost:8000/test/e2e/general-usecase/bundle.js.map", - "apm-agent-js", - "1.0.0", - ) - systemtest.Elasticsearch.ExpectDocs(t, "apm-*-sourcemap", nil) - - // trigger cache - systemtest.SendRUMEventsPayload(t, srv, "../testdata/intake-v2/errors_rum.ndjson") - systemtest.Elasticsearch.ExpectDocs(t, "apm-*-error", nil) - - // delete sourcemap index - deleteIndex(t, "apm-*-sourcemap*") - - // wait for the cache to expire - time.Sleep(time.Second) - - // no sourcemap available, since the index was removed and the cache was expired + deleteIndex(t, "apm-*-error*") systemtest.SendRUMEventsPayload(t, srv, "../testdata/intake-v2/errors_rum.ndjson") - systemtest.Elasticsearch.ExpectDocs(t, "apm-*-error", nil) - expectLogMessage(t, srv, "No Sourcemap available") + result = systemtest.Elasticsearch.ExpectMinDocs(t, 1, "apm-*-error", nil) + assertSourcemapUpdated(t, result, true) } func uploadSourcemap(t *testing.T, srv *apmservertest.Server, sourcemapFile, bundleFilepath, serviceName, serviceVersion string) { @@ -272,22 +239,6 @@ func newUploadSourcemapRequest(t *testing.T, srv *apmservertest.Server, sourcema return req } -func expectLogMessage(t *testing.T, srv *apmservertest.Server, message string) { - timeout := time.After(time.Minute) - logs := srv.Logs.Iterator() - defer logs.Close() - for { - select { - case entry := <-logs.C(): - if strings.Contains(entry.Message, message) { - return - } - case <-timeout: - t.Fatal("timed out waiting for log message") - } - } -} - func deleteIndex(t *testing.T, name string) { resp, err := systemtest.Elasticsearch.Indices.Delete([]string{name}) require.NoError(t, err) @@ -297,7 +248,23 @@ func deleteIndex(t *testing.T, name string) { resp.Body.Close() } -func expectSourcemapUpdated(t *testing.T, result estest.SearchResult) { +func assertSourcemapUpdated(t *testing.T, result estest.SearchResult, updated bool) { + t.Helper() + + type StacktraceFrame struct { + Sourcemap struct { + Updated bool + } + } + type Error struct { + Exception []struct { + Stacktrace []StacktraceFrame + } + Log struct { + Stacktrace []StacktraceFrame + } + } + for _, hit := range result.Hits.Hits { var source struct { Error Error @@ -307,33 +274,12 @@ func expectSourcemapUpdated(t *testing.T, result estest.SearchResult) { for _, exception := range source.Error.Exception { for _, stacktrace := range exception.Stacktrace { - assert.True(t, stacktrace.Sourcemap.Updated) + assert.Equal(t, updated, stacktrace.Sourcemap.Updated) } } for _, stacktrace := range source.Error.Log.Stacktrace { - assert.True(t, stacktrace.Sourcemap.Updated) + assert.Equal(t, updated, stacktrace.Sourcemap.Updated) } } } - -type Error struct { - Exception []Exception - Log Log -} - -type Exception struct { - Stacktrace []Stacktrace -} - -type Log struct { - Stacktrace []Stacktrace -} - -type Stacktrace struct { - Sourcemap Sourcemap -} - -type Sourcemap struct { - Updated bool -} diff --git a/transform/transform.go b/transform/transform.go index 1b2733da2ba..58ee99523e8 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -22,8 +22,6 @@ import ( "regexp" "github.com/elastic/beats/v7/libbeat/beat" - - "github.com/elastic/apm-server/sourcemap" ) // Transformable is an interface implemented by all top-level model objects for @@ -45,5 +43,4 @@ type Config struct { type RUMConfig struct { LibraryPattern *regexp.Regexp ExcludeFromGrouping *regexp.Regexp - SourcemapStore *sourcemap.Store } From 4fd5174fbcf5cb4827e43d8f0a99e5ad25c4ee01 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 8 Jul 2021 13:04:55 +0800 Subject: [PATCH 2/4] Reintroduce `sourcemap.error` If there is an error fetching a source map, we once again set `sourcemap.error` on the frame and log the error at debug level. The logging is now using zap's rate limited logging, rather than storing in a map. --- model/stacktrace.go | 2 + model/stacktrace_test.go | 2 + sourcemap/processor.go | 86 ++++++++++++++++++------------------- sourcemap/processor_test.go | 43 +++++++++++++++++++ 4 files changed, 90 insertions(+), 43 deletions(-) diff --git a/model/stacktrace.go b/model/stacktrace.go index 95d294f9d36..9ef562d3f50 100644 --- a/model/stacktrace.go +++ b/model/stacktrace.go @@ -45,6 +45,7 @@ type StacktraceFrame struct { ExcludeFromGrouping bool SourcemapUpdated bool + SourcemapError string Original Original } @@ -109,6 +110,7 @@ func (s *StacktraceFrame) transform(cfg *transform.Config, rum bool) common.MapS if s.SourcemapUpdated { sm.set("updated", true) } + sm.maybeSetString("error", s.SourcemapError) m.maybeSetMapStr("sourcemap", common.MapStr(sm)) var orig mapStr diff --git a/model/stacktrace_test.go b/model/stacktrace_test.go index f7e701e253e..1a64901758c 100644 --- a/model/stacktrace_test.go +++ b/model/stacktrace_test.go @@ -113,6 +113,7 @@ func TestStacktraceTransform(t *testing.T) { }, ExcludeFromGrouping: true, SourcemapUpdated: true, + SourcemapError: "boom", ContextLine: contextLine, PreContext: preContext, PostContext: postContext, @@ -142,6 +143,7 @@ func TestStacktraceTransform(t *testing.T) { "exclude_from_grouping": true, "sourcemap": common.MapStr{ "updated": true, + "error": "boom", }, }}, Msg: "mapped stacktrace", diff --git a/sourcemap/processor.go b/sourcemap/processor.go index cd904ee5649..e734cabf8fb 100644 --- a/sourcemap/processor.go +++ b/sourcemap/processor.go @@ -19,7 +19,12 @@ package sourcemap import ( "context" + "sync" + "time" + "github.com/elastic/beats/v7/libbeat/logp" + + logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/utility" ) @@ -36,58 +41,38 @@ func (p BatchProcessor) ProcessBatch(ctx context.Context, batch *model.Batch) er for _, event := range *batch { switch { case event.Span != nil: - if err := p.processSpan(ctx, event.Span); err != nil { - return err - } + p.processSpan(ctx, event.Span) case event.Error != nil: - if err := p.processError(ctx, event.Error); err != nil { - return err - } + p.processError(ctx, event.Error) } } return nil } -func (p BatchProcessor) processSpan(ctx context.Context, event *model.Span) error { +func (p BatchProcessor) processSpan(ctx context.Context, event *model.Span) { if event.Metadata.Service.Name == "" || event.Metadata.Service.Version == "" { - return nil - } - if err := p.processStacktraceFrames(ctx, &event.Metadata.Service, event.Stacktrace...); err != nil { - return err + return } - return nil + p.processStacktraceFrames(ctx, &event.Metadata.Service, event.Stacktrace...) } -func (p BatchProcessor) processError(ctx context.Context, event *model.Error) error { +func (p BatchProcessor) processError(ctx context.Context, event *model.Error) { if event.Metadata.Service.Name == "" || event.Metadata.Service.Version == "" { - return nil + return } if event.Log != nil { - if err := p.processStacktraceFrames( - ctx, &event.Metadata.Service, - event.Log.Stacktrace..., - ); err != nil { - return err - } + p.processStacktraceFrames(ctx, &event.Metadata.Service, event.Log.Stacktrace...) } if event.Exception != nil { - if err := p.processException(ctx, &event.Metadata.Service, event.Exception); err != nil { - return err - } + p.processException(ctx, &event.Metadata.Service, event.Exception) } - return nil } -func (p BatchProcessor) processException(ctx context.Context, service *model.Service, exception *model.Exception) error { - if err := p.processStacktraceFrames(ctx, service, exception.Stacktrace...); err != nil { - return err - } +func (p BatchProcessor) processException(ctx context.Context, service *model.Service, exception *model.Exception) { + p.processStacktraceFrames(ctx, service, exception.Stacktrace...) for _, cause := range exception.Cause { - if err := p.processStacktraceFrames(ctx, service, cause.Stacktrace...); err != nil { - return err - } + p.processStacktraceFrames(ctx, service, cause.Stacktrace...) } - return nil } // source map algorithm: @@ -111,11 +96,7 @@ func (p BatchProcessor) processStacktraceFrames(ctx context.Context, service *mo prevFunction := "" for i := len(frames) - 1; i >= 0; i-- { frame := frames[i] - mapped, function, err := p.processStacktraceFrame(ctx, service, frame, prevFunction) - if err != nil { - return err - } - if mapped { + if mapped, function := p.processStacktraceFrame(ctx, service, frame, prevFunction); mapped { prevFunction = function } } @@ -127,22 +108,24 @@ func (p BatchProcessor) processStacktraceFrame( service *model.Service, frame *model.StacktraceFrame, prevFunction string, -) (bool, string, error) { +) (bool, string) { if frame.Colno == nil || frame.Lineno == nil || frame.AbsPath == "" { - return false, "", nil + return false, "" } path := utility.CleanUrlPath(frame.AbsPath) mapper, err := p.Store.Fetch(ctx, service.Name, service.Version, path) if err != nil { - return false, "", err + frame.SourcemapError = err.Error() + getProcessorLogger().Debugf("failed to fetch source map: %s", frame.SourcemapError) + return false, "" } if mapper == nil { - return false, "", nil + return false, "" } file, function, lineno, colno, ctxLine, preCtx, postCtx, ok := Map(mapper, *frame.Lineno, *frame.Colno) if !ok { - return false, "", nil + return false, "" } // Store original source information. @@ -167,5 +150,22 @@ func (p BatchProcessor) processStacktraceFrame( if function == "" { function = "" } - return true, function, nil + return true, function } + +func getProcessorLogger() *logp.Logger { + processorLoggerOnce.Do(func() { + // We use a rate limited logger to avoid spamming the logs + // due to issues communicating with Elasticsearch, for example. + processorLogger = logp.NewLogger( + logs.Stacktrace, + logs.WithRateLimit(time.Minute), + ) + }) + return processorLogger +} + +var ( + processorLoggerOnce sync.Once + processorLogger *logp.Logger +) diff --git a/sourcemap/processor_test.go b/sourcemap/processor_test.go index c9201cce0f5..5220ab0cadd 100644 --- a/sourcemap/processor_test.go +++ b/sourcemap/processor_test.go @@ -25,6 +25,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/sourcemap" "github.com/elastic/apm-server/sourcemap/test" @@ -213,6 +215,47 @@ func TestBatchProcessor(t *testing.T) { }, error3) } +func TestBatchProcessorElasticsearchUnavailable(t *testing.T) { + client := test.ESClientUnavailable(t) + store, err := sourcemap.NewElasticsearchStore(client, "index", time.Minute) + require.NoError(t, err) + + metadata := model.Metadata{ + Service: model.Service{ + Name: "service_name", + Version: "service_version", + }, + } + nonMatchingFrame := model.StacktraceFrame{ + AbsPath: "bundle.js", + Lineno: newInt(0), + Colno: newInt(0), + Function: "original function", + } + + span := &model.Span{ + Metadata: metadata, + Stacktrace: model.Stacktrace{cloneFrame(nonMatchingFrame), cloneFrame(nonMatchingFrame)}, + } + + logp.DevelopmentSetup(logp.ToObserverOutput()) + for i := 0; i < 2; i++ { + processor := sourcemap.BatchProcessor{Store: store} + err = processor.ProcessBatch(context.Background(), &model.Batch{{Span: span}, {Span: span}}) + assert.NoError(t, err) + } + + // SourcemapError should have been set, but the frames should otherwise be unmodified. + expectedFrame := nonMatchingFrame + expectedFrame.SourcemapError = "failure querying ES: client error" + assert.Equal(t, model.Stacktrace{&expectedFrame, &expectedFrame}, span.Stacktrace) + + // We should have a single log message, due to rate limiting. + entries := logp.ObserverLogs().TakeAll() + require.Len(t, entries, 1) + assert.Equal(t, "failed to fetch source map: failure querying ES: client error", entries[0].Message) +} + func cloneFrame(frame model.StacktraceFrame) *model.StacktraceFrame { return &frame } From 552f3539f144453fd2f85ef4dbe300e180377671 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 8 Jul 2021 18:07:45 +0800 Subject: [PATCH 3/4] Introduce sourcemap timeout config --- _meta/beat.yml | 3 +++ apm-server.docker.yml | 3 +++ apm-server.yml | 3 +++ beater/api/mux.go | 5 +++- beater/config/config_test.go | 3 +++ beater/config/rum.go | 3 +++ sourcemap/processor.go | 18 +++++++++++-- sourcemap/processor_test.go | 52 ++++++++++++++++++++++++++++++++++++ 8 files changed, 87 insertions(+), 3 deletions(-) diff --git a/_meta/beat.yml b/_meta/beat.yml index bc24d73ffe5..85a9d68a570 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -263,6 +263,9 @@ apm-server: # Sourcemapping is enabled by default. #enabled: true + # Timeout for fetching source maps. + #timeout: 5s + # Source maps may be fetched from Elasticsearch by using the # output.elasticsearch configuration. # A different instance must be configured when using any other output. diff --git a/apm-server.docker.yml b/apm-server.docker.yml index 28f224082a2..360bc589a49 100644 --- a/apm-server.docker.yml +++ b/apm-server.docker.yml @@ -263,6 +263,9 @@ apm-server: # Sourcemapping is enabled by default. #enabled: true + # Timeout for fetching source maps. + #timeout: 5s + # Source maps may be fetched from Elasticsearch by using the # output.elasticsearch configuration. # A different instance must be configured when using any other output. diff --git a/apm-server.yml b/apm-server.yml index f78e05193a0..7aa29344217 100644 --- a/apm-server.yml +++ b/apm-server.yml @@ -263,6 +263,9 @@ apm-server: # Sourcemapping is enabled by default. #enabled: true + # Timeout for fetching source maps. + #timeout: 5s + # Source maps may be fetched from Elasticsearch by using the # output.elasticsearch configuration. # A different instance must be configured when using any other output. diff --git a/beater/api/mux.go b/beater/api/mux.go index e0cefcaa982..da33d95acd1 100644 --- a/beater/api/mux.go +++ b/beater/api/mux.go @@ -184,7 +184,10 @@ func (r *routeBuilder) rumIntakeHandler(newProcessor func(*config.Config) *strea batchProcessor := r.batchProcessor if r.sourcemapStore != nil { batchProcessor = modelprocessor.Chained{ - sourcemap.BatchProcessor{Store: r.sourcemapStore}, + sourcemap.BatchProcessor{ + Store: r.sourcemapStore, + Timeout: r.cfg.RumConfig.SourceMapping.Timeout, + }, batchProcessor, } } diff --git a/beater/config/config_test.go b/beater/config/config_test.go index 32e68dd031c..5b9547411df 100644 --- a/beater/config/config_test.go +++ b/beater/config/config_test.go @@ -104,6 +104,7 @@ func TestUnpackConfig(t *testing.T) { }, "index_pattern": "apm-test*", "elasticsearch.hosts": []string{"localhost:9201", "localhost:9202"}, + "timeout": "2s", }, "library_pattern": "^custom", "exclude_from_grouping": "^grouping", @@ -210,6 +211,7 @@ func TestUnpackConfig(t *testing.T) { Backoff: elasticsearch.DefaultBackoffConfig, }, Metadata: []SourceMapMetadata{}, + Timeout: 2 * time.Second, esConfigured: true, }, LibraryPattern: "^custom", @@ -393,6 +395,7 @@ func TestUnpackConfig(t *testing.T) { SourceMapURL: "http://somewhere.com/bundle.js.map", }, }, + Timeout: 5 * time.Second, }, LibraryPattern: "rum", ExcludeFromGrouping: "^/webpack", diff --git a/beater/config/rum.go b/beater/config/rum.go index 1e61e98b344..0f131d87de3 100644 --- a/beater/config/rum.go +++ b/beater/config/rum.go @@ -37,6 +37,7 @@ const ( defaultLibraryPattern = "node_modules|bower_components|~" defaultSourcemapCacheExpiration = 5 * time.Minute defaultSourcemapIndexPattern = "apm-*-sourcemap*" + defaultSourcemapTimeout = 5 * time.Second ) // RumConfig holds config information related to the RUM endpoint @@ -65,6 +66,7 @@ type SourceMapping struct { IndexPattern string `config:"index_pattern"` ESConfig *elasticsearch.Config `config:"elasticsearch"` Metadata []SourceMapMetadata `config:"metadata"` + Timeout time.Duration `config:"timeout" validate:"positive"` esConfigured bool } @@ -117,6 +119,7 @@ func defaultSourcemapping() SourceMapping { IndexPattern: defaultSourcemapIndexPattern, ESConfig: elasticsearch.DefaultConfig(), Metadata: []SourceMapMetadata{}, + Timeout: defaultSourcemapTimeout, } } diff --git a/sourcemap/processor.go b/sourcemap/processor.go index e734cabf8fb..cbc7b1eda43 100644 --- a/sourcemap/processor.go +++ b/sourcemap/processor.go @@ -29,15 +29,29 @@ import ( "github.com/elastic/apm-server/utility" ) -// BatchProcessor is a model.BatchProcessor that performs source -// mapping for span and error events. +// BatchProcessor is a model.BatchProcessor that performs source mapping for +// span and error events. Any errors fetching source maps, including the +// timeout expiring, will result in the StacktraceFrame.SourcemapError field +// being set; the error will not be returned. type BatchProcessor struct { + // Store is the store to use for fetching source maps. Store *Store + + // Timeout holds a timeout for each ProcessBatch call, to limit how + // much time is spent fetching source maps. + // + // If Timeout is <= 0, it will be ignored. + Timeout time.Duration } // ProcessBatch processes spans and errors, applying source maps // to their stack traces. func (p BatchProcessor) ProcessBatch(ctx context.Context, batch *model.Batch) error { + if p.Timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, p.Timeout) + defer cancel() + } for _, event := range *batch { switch { case event.Span != nil: diff --git a/sourcemap/processor_test.go b/sourcemap/processor_test.go index 5220ab0cadd..239c0e88af2 100644 --- a/sourcemap/processor_test.go +++ b/sourcemap/processor_test.go @@ -19,6 +19,7 @@ package sourcemap_test import ( "context" + "net/http" "testing" "time" @@ -27,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/apm-server/elasticsearch" "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/sourcemap" "github.com/elastic/apm-server/sourcemap/test" @@ -256,6 +258,50 @@ func TestBatchProcessorElasticsearchUnavailable(t *testing.T) { assert.Equal(t, "failed to fetch source map: failure querying ES: client error", entries[0].Message) } +func TestBatchProcessorTimeout(t *testing.T) { + var transport roundTripperFunc = func(req *http.Request) (*http.Response, error) { + <-req.Context().Done() + // TODO(axw) remove this "notTimeout" error wrapper when + // https://github.com/elastic/go-elasticsearch/issues/300 + // is fixed. + // + // Because context.DeadlineExceeded implements net.Error, + // go-elasticsearch continues retrying and does not exit + // early. + type notTimeout struct{ error } + return nil, notTimeout{req.Context().Err()} + } + client, err := elasticsearch.NewVersionedClient("", "", "", []string{""}, nil, transport, 3, elasticsearch.DefaultBackoff) + require.NoError(t, err) + store, err := sourcemap.NewElasticsearchStore(client, "index", time.Minute) + require.NoError(t, err) + + metadata := model.Metadata{ + Service: model.Service{ + Name: "service_name", + Version: "service_version", + }, + } + + frame := model.StacktraceFrame{ + AbsPath: "bundle.js", + Lineno: newInt(0), + Colno: newInt(0), + Function: "original function", + } + span := &model.Span{ + Metadata: metadata, + Stacktrace: model.Stacktrace{cloneFrame(frame)}, + } + + before := time.Now() + processor := sourcemap.BatchProcessor{Store: store, Timeout: 100 * time.Millisecond} + err = processor.ProcessBatch(context.Background(), &model.Batch{{Span: span}}) + assert.NoError(t, err) + taken := time.Since(before) + assert.Less(t, taken, time.Second) +} + func cloneFrame(frame model.StacktraceFrame) *model.StacktraceFrame { return &frame } @@ -263,3 +309,9 @@ func cloneFrame(frame model.StacktraceFrame) *model.StacktraceFrame { func newInt(v int) *int { return &v } + +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} From 08ab1519b637940a7005d41b295968504e57a61d Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 12 Jul 2021 14:31:41 +0800 Subject: [PATCH 4/4] Remove unnecessary fleetCfg param --- beater/beater.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beater/beater.go b/beater/beater.go index 344095cb365..d21419e0e91 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -364,7 +364,7 @@ func (s *serverRunner) Start() { func (s *serverRunner) run() error { // Send config to telemetry. recordAPMServerConfig(s.config) - transformConfig := newTransformConfig(s.beat.Info, s.config, s.fleetConfig) + transformConfig := newTransformConfig(s.beat.Info, s.config) publisherConfig := &publish.PublisherConfig{ Info: s.beat.Info, Pipeline: s.config.Pipeline, @@ -625,7 +625,7 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ } } -func newTransformConfig(beatInfo beat.Info, cfg *config.Config, fleetCfg *config.Fleet) *transform.Config { +func newTransformConfig(beatInfo beat.Info, cfg *config.Config) *transform.Config { return &transform.Config{ DataStreams: cfg.DataStreams.Enabled, RUM: transform.RUMConfig{