Skip to content

Commit

Permalink
Move source mapping to model processing (#5631)
Browse files Browse the repository at this point in the history
* Move source mapping to model processing

Source mapping transformation is rewritten as
a model.BatchProcessor, moved out of the model
package and into the sourcemap package.

One side-effect of the move to a process, worth
debating, is that source mapping now occurs in
the HTTP handler goroutine rather than in the
publisher, which could increase request latency
slightly. On the other hand this means that source
mapping for more clients than there are CPUs can
now happen concurrently (the publisher is limited
to the number of CPUs in this regard); and the
handlers will block for up to one second anyway
if the publisher is busy/queue is full.

If a stack frame cannot be mapped, we no longer
set `sourcemap.error` or log anything. Just because
RUM and source mapping is enabled, does not mean
that all stacks _must_ be source mapped; therefore
these "errors" are mostly just noise. Likewise we
now only set `sourcemap.updated` when it is true.

* Reintroduce `sourcemap.error`

If there is an error fetching a source map,
we once again set `sourcemap.error` on the
frame and log the error at debug level. The
logging is now using zap's rate limited
logging, rather than storing in a map.

* Introduce sourcemap timeout config

* Remove unnecessary fleetCfg param

(cherry picked from commit ac3dc27)

# Conflicts:
#	changelogs/head.asciidoc
  • Loading branch information
axw authored and mergify-bot committed Jul 12, 2021
1 parent 60e132f commit 26f4338
Show file tree
Hide file tree
Showing 32 changed files with 1,096 additions and 1,226 deletions.
3 changes: 3 additions & 0 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ apm-server:
# Sourcemapping is enabled by default.
#enabled: true

# Timeout for fetching source maps.
#timeout: 5s

# Source maps may be fetched from Elasticsearch by using the
# output.elasticsearch configuration.
# A different instance must be configured when using any other output.
Expand Down
3 changes: 3 additions & 0 deletions apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ apm-server:
# Sourcemapping is enabled by default.
#enabled: true

# Timeout for fetching source maps.
#timeout: 5s

# Source maps may be fetched from Elasticsearch by using the
# output.elasticsearch configuration.
# A different instance must be configured when using any other output.
Expand Down
3 changes: 3 additions & 0 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ apm-server:
# Sourcemapping is enabled by default.
#enabled: true

# Timeout for fetching source maps.
#timeout: 5s

# Source maps may be fetched from Elasticsearch by using the
# output.elasticsearch configuration.
# A different instance must be configured when using any other output.
Expand Down
11 changes: 10 additions & 1 deletion beater/api/asset/sourcemap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package sourcemap

import (
"context"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -46,8 +47,14 @@ var (
validateError = monitoring.NewInt(registry, "validation.errors")
)

// AddedNotifier is an interface for notifying of sourcemap additions.
// This is implemented by sourcemap.Store.
type AddedNotifier interface {
NotifyAdded(ctx context.Context, serviceName, serviceVersion, bundleFilepath string)
}

// Handler returns a request.Handler for managing asset requests.
func Handler(report publish.Reporter) request.Handler {
func Handler(report publish.Reporter, notifier AddedNotifier) request.Handler {
return func(c *request.Context) {
if c.Request.Method != "POST" {
c.Result.SetDefault(request.IDResponseErrorsMethodNotAllowed)
Expand Down Expand Up @@ -99,7 +106,9 @@ func Handler(report publish.Reporter) request.Handler {
c.Result.SetWithError(request.IDResponseErrorsFullQueue, err)
}
c.Write()
return
}
notifier.NotifyAdded(c.Request.Context(), smap.ServiceName, smap.ServiceVersion, smap.BundleFilepath)
c.Result.SetDefault(request.IDResponseValidAccepted)
c.Write()
}
Expand Down
12 changes: 11 additions & 1 deletion beater/api/asset/sourcemap/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ import (
"github.com/elastic/apm-server/transform"
)

type notifier struct {
notified bool
}

func (n *notifier) NotifyAdded(ctx context.Context, serviceName, serviceVersion, bundleFilepath string) {
n.notified = true
}

func TestAssetHandler(t *testing.T) {
testcases := map[string]testcaseT{
"method": {
Expand Down Expand Up @@ -111,6 +119,7 @@ func TestAssetHandler(t *testing.T) {
// test assertion
assert.Equal(t, tc.code, tc.w.Code)
assert.Equal(t, tc.body, tc.w.Body.String())
assert.Equal(t, tc.code == http.StatusAccepted, tc.notifier.notified)
})
}
}
Expand All @@ -122,6 +131,7 @@ type testcaseT struct {
contentType string
reporter func(ctx context.Context, p publish.PendingReq) error
authorizer authorizerFunc
notifier notifier

missingSourcemap, missingServiceName, missingServiceVersion, missingBundleFilepath bool

Expand Down Expand Up @@ -178,7 +188,7 @@ func (tc *testcaseT) setup() error {
}
c := request.NewContext()
c.Reset(tc.w, tc.r)
h := Handler(tc.reporter)
h := Handler(tc.reporter, &tc.notifier)
h(c)
return nil
}
Expand Down
19 changes: 16 additions & 3 deletions beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/agentcfg"
"github.com/elastic/apm-server/beater/api/asset/sourcemap"
apisourcemap "github.com/elastic/apm-server/beater/api/asset/sourcemap"
"github.com/elastic/apm-server/beater/api/config/agent"
"github.com/elastic/apm-server/beater/api/intake"
"github.com/elastic/apm-server/beater/api/profile"
Expand All @@ -42,6 +42,7 @@ import (
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/processor/stream"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/sourcemap"
)

const (
Expand Down Expand Up @@ -84,6 +85,7 @@ func NewMux(
batchProcessor model.BatchProcessor,
fetcher agentcfg.Fetcher,
ratelimitStore *ratelimit.Store,
sourcemapStore *sourcemap.Store,
) (*http.ServeMux, error) {
pool := request.NewContextPool()
mux := http.NewServeMux()
Expand All @@ -101,6 +103,7 @@ func NewMux(
reporter: report,
batchProcessor: batchProcessor,
ratelimitStore: ratelimitStore,
sourcemapStore: sourcemapStore,
}

type route struct {
Expand Down Expand Up @@ -151,6 +154,7 @@ type routeBuilder struct {
reporter publish.Reporter
batchProcessor model.BatchProcessor
ratelimitStore *ratelimit.Store
sourcemapStore *sourcemap.Store
}

func (r *routeBuilder) profileHandler() (request.Handler, error) {
Expand Down Expand Up @@ -178,14 +182,23 @@ func (r *routeBuilder) rumIntakeHandler(newProcessor func(*config.Config) *strea
}
return func() (request.Handler, error) {
batchProcessor := r.batchProcessor
if r.sourcemapStore != nil {
batchProcessor = modelprocessor.Chained{
sourcemap.BatchProcessor{
Store: r.sourcemapStore,
Timeout: r.cfg.RumConfig.SourceMapping.Timeout,
},
batchProcessor,
}
}
batchProcessor = batchProcessorWithAllowedServiceNames(batchProcessor, r.cfg.RumConfig.AllowServiceNames)
h := intake.Handler(newProcessor(r.cfg), requestMetadataFunc, batchProcessor)
return middleware.Wrap(h, rumMiddleware(r.cfg, r.authenticator, r.ratelimitStore, intake.MonitoringMap)...)
}
}

func (r *routeBuilder) sourcemapHandler() (request.Handler, error) {
h := sourcemap.Handler(r.reporter)
h := apisourcemap.Handler(r.reporter, r.sourcemapStore)
return middleware.Wrap(h, sourcemapMiddleware(r.cfg, r.authenticator, r.ratelimitStore)...)
}

Expand Down Expand Up @@ -270,7 +283,7 @@ func sourcemapMiddleware(cfg *config.Config, auth *auth.Authenticator, ratelimit
msg = "When APM Server is managed by Fleet, Sourcemaps must be uploaded directly to Elasticsearch."
}
enabled := cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled && !cfg.DataStreams.Enabled
backendMiddleware := backendMiddleware(cfg, auth, ratelimitStore, sourcemap.MonitoringMap)
backendMiddleware := backendMiddleware(cfg, auth, ratelimitStore, apisourcemap.MonitoringMap)
return append(backendMiddleware, middleware.KillSwitchMiddleware(enabled, msg))
}

Expand Down
13 changes: 11 additions & 2 deletions beater/api/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/apm-server/beater/request"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/sourcemap"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/monitoring"
)
Expand Down Expand Up @@ -78,7 +79,11 @@ func requestToMuxer(cfg *config.Config, r *http.Request) (*httptest.ResponseReco
nopReporter := func(context.Context, publish.PendingReq) error { return nil }
nopBatchProcessor := model.ProcessBatchFunc(func(context.Context, *model.Batch) error { return nil })
ratelimitStore, _ := ratelimit.NewStore(1000, 1000, 1000)
mux, err := NewMux(beat.Info{Version: "1.2.3"}, cfg, nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore)
var sourcemapStore *sourcemap.Store
mux, err := NewMux(
beat.Info{Version: "1.2.3"}, cfg,
nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore, sourcemapStore,
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,7 +119,11 @@ func newTestMux(t *testing.T, cfg *config.Config) http.Handler {
nopReporter := func(context.Context, publish.PendingReq) error { return nil }
nopBatchProcessor := model.ProcessBatchFunc(func(context.Context, *model.Batch) error { return nil })
ratelimitStore, _ := ratelimit.NewStore(1000, 1000, 1000)
mux, err := NewMux(beat.Info{Version: "1.2.3"}, cfg, nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore)
var sourcemapStore *sourcemap.Store
mux, err := NewMux(
beat.Info{Version: "1.2.3"}, cfg,
nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore, sourcemapStore,
)
require.NoError(t, err)
return mux
}
29 changes: 13 additions & 16 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,7 @@ func (s *serverRunner) Start() {
func (s *serverRunner) run() error {
// Send config to telemetry.
recordAPMServerConfig(s.config)
transformConfig, err := newTransformConfig(s.beat.Info, s.config, s.fleetConfig)
if err != nil {
return err
}
transformConfig := newTransformConfig(s.beat.Info, s.config)
publisherConfig := &publish.PublisherConfig{
Info: s.beat.Info,
Pipeline: s.config.Pipeline,
Expand All @@ -387,6 +384,15 @@ func (s *serverRunner) run() error {
}()
}

var sourcemapStore *sourcemap.Store
if s.config.RumConfig.Enabled && s.config.RumConfig.SourceMapping.Enabled {
store, err := newSourcemapStore(s.beat.Info, s.config.RumConfig.SourceMapping, s.fleetConfig)
if err != nil {
return err
}
sourcemapStore = store
}

// When the publisher stops cleanly it will close its pipeline client,
// calling the acker's Close method. We need to call Open for each new
// publisher to ensure we wait for all clients and enqueued events to
Expand Down Expand Up @@ -432,6 +438,7 @@ func (s *serverRunner) run() error {
Logger: s.logger,
Tracer: s.tracer,
BatchProcessor: batchProcessor,
SourcemapStore: sourcemapStore,
}); err != nil {
return err
}
Expand Down Expand Up @@ -618,24 +625,14 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ
}
}

func newTransformConfig(beatInfo beat.Info, cfg *config.Config, fleetCfg *config.Fleet) (*transform.Config, error) {
transformConfig := &transform.Config{
func newTransformConfig(beatInfo beat.Info, cfg *config.Config) *transform.Config {
return &transform.Config{
DataStreams: cfg.DataStreams.Enabled,
RUM: transform.RUMConfig{
LibraryPattern: regexp.MustCompile(cfg.RumConfig.LibraryPattern),
ExcludeFromGrouping: regexp.MustCompile(cfg.RumConfig.ExcludeFromGrouping),
},
}

if cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled {
store, err := newSourcemapStore(beatInfo, cfg.RumConfig.SourceMapping, fleetCfg)
if err != nil {
return nil, err
}
transformConfig.RUM.SourcemapStore = store
}

return transformConfig, nil
}

func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping, fleetCfg *config.Fleet) (*sourcemap.Store, error) {
Expand Down
35 changes: 6 additions & 29 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,9 @@ func TestTransformConfigIndex(t *testing.T) {
cfg.RumConfig.SourceMapping.IndexPattern = indexPattern
}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
store, err := newSourcemapStore(beat.Info{Version: "1.2.3"}, cfg.RumConfig.SourceMapping, nil)
require.NoError(t, err)
require.NotNil(t, transformConfig.RUM.SourcemapStore)
transformConfig.RUM.SourcemapStore.Added(context.Background(), "name", "version", "path")
store.NotifyAdded(context.Background(), "name", "version", "path")
require.Len(t, requestPaths, 1)

path := requestPaths[0]
Expand All @@ -267,26 +266,6 @@ func TestTransformConfigIndex(t *testing.T) {
t.Run("with-observer-version", func(t *testing.T) { test(t, "blah-%{[observer.version]}-blah", "blah-1.2.3-blah") })
}

func TestTransformConfig(t *testing.T) {
test := func(rumEnabled, sourcemapEnabled bool, expectSourcemapStore bool) {
cfg := config.DefaultConfig()
cfg.RumConfig.Enabled = rumEnabled
cfg.RumConfig.SourceMapping.Enabled = sourcemapEnabled
transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
require.NoError(t, err)
if expectSourcemapStore {
assert.NotNil(t, transformConfig.RUM.SourcemapStore)
} else {
assert.Nil(t, transformConfig.RUM.SourcemapStore)
}
}

test(false, false, false)
test(false, true, false)
test(true, false, false)
test(true, true, true)
}

func TestStoreUsesRUMElasticsearchConfig(t *testing.T) {
var called bool
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -301,11 +280,11 @@ func TestStoreUsesRUMElasticsearchConfig(t *testing.T) {
cfg.RumConfig.SourceMapping.ESConfig = elasticsearch.DefaultConfig()
cfg.RumConfig.SourceMapping.ESConfig.Hosts = []string{ts.URL}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
store, err := newSourcemapStore(beat.Info{Version: "1.2.3"}, cfg.RumConfig.SourceMapping, nil)
require.NoError(t, err)
// Check that the provided rum elasticsearch config was used and
// Fetch() goes to the test server.
_, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path")
_, err = store.Fetch(context.Background(), "app", "1.0", "/bundle/path")
require.NoError(t, err)

assert.True(t, called)
Expand Down Expand Up @@ -338,11 +317,9 @@ func TestFleetStoreUsed(t *testing.T) {
TLS: nil,
}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, fleetCfg)
store, err := newSourcemapStore(beat.Info{Version: "1.2.3"}, cfg.RumConfig.SourceMapping, fleetCfg)
require.NoError(t, err)
// Check that the provided rum elasticsearch config was used and
// Fetch() goes to the test server.
_, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path")
_, err = store.Fetch(context.Background(), "app", "1.0", "/bundle/path")
require.NoError(t, err)

assert.True(t, called)
Expand Down
3 changes: 3 additions & 0 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func TestUnpackConfig(t *testing.T) {
},
"index_pattern": "apm-test*",
"elasticsearch.hosts": []string{"localhost:9201", "localhost:9202"},
"timeout": "2s",
},
"library_pattern": "^custom",
"exclude_from_grouping": "^grouping",
Expand Down Expand Up @@ -210,6 +211,7 @@ func TestUnpackConfig(t *testing.T) {
Backoff: elasticsearch.DefaultBackoffConfig,
},
Metadata: []SourceMapMetadata{},
Timeout: 2 * time.Second,
esConfigured: true,
},
LibraryPattern: "^custom",
Expand Down Expand Up @@ -393,6 +395,7 @@ func TestUnpackConfig(t *testing.T) {
SourceMapURL: "http://somewhere.com/bundle.js.map",
},
},
Timeout: 5 * time.Second,
},
LibraryPattern: "rum",
ExcludeFromGrouping: "^/webpack",
Expand Down
3 changes: 3 additions & 0 deletions beater/config/rum.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
defaultLibraryPattern = "node_modules|bower_components|~"
defaultSourcemapCacheExpiration = 5 * time.Minute
defaultSourcemapIndexPattern = "apm-*-sourcemap*"
defaultSourcemapTimeout = 5 * time.Second
)

// RumConfig holds config information related to the RUM endpoint
Expand Down Expand Up @@ -65,6 +66,7 @@ type SourceMapping struct {
IndexPattern string `config:"index_pattern"`
ESConfig *elasticsearch.Config `config:"elasticsearch"`
Metadata []SourceMapMetadata `config:"metadata"`
Timeout time.Duration `config:"timeout" validate:"positive"`
esConfigured bool
}

Expand Down Expand Up @@ -117,6 +119,7 @@ func defaultSourcemapping() SourceMapping {
IndexPattern: defaultSourcemapIndexPattern,
ESConfig: elasticsearch.DefaultConfig(),
Metadata: []SourceMapMetadata{},
Timeout: defaultSourcemapTimeout,
}
}

Expand Down
Loading

0 comments on commit 26f4338

Please sign in to comment.