From d071010d6ac8b551b64a10a6c43e78c0437fe700 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 23 Jun 2021 14:41:15 +0200 Subject: [PATCH] [7.x] Service specific source maps (backport #5410) (#5524) * Service specific source maps (#5410) Serve service-specific sourcemaps from fleet-server (cherry picked from commit 9453fe24064ea429a82160c3321ea09a6c4603a4) # Conflicts: # apmpackage/apm/manifest.yml # changelogs/head.asciidoc * Update manifest.yml * Delete head.asciidoc Co-authored-by: stuart nelson --- _meta/beat.yml | 5 +- apm-server.docker.yml | 5 +- apm-server.yml | 5 +- apmpackage/apm/manifest.yml | 8 +- beater/beater.go | 68 +++++++--- beater/beater_test.go | 68 +++++++++- beater/config/config_test.go | 17 +++ beater/config/integration.go | 7 +- beater/config/rum.go | 22 ++-- beater/config/sourcemapping.go | 26 ++++ model/error_test.go | 2 +- model/sourcemap_test.go | 2 +- model/stacktrace_frame_test.go | 2 +- sourcemap/es_store.go | 15 +++ sourcemap/fleet_store.go | 139 +++++++++++++++++++++ sourcemap/fleet_store_test.go | 79 ++++++++++++ sourcemap/store.go | 82 +++++++++--- sourcemap/store_test.go | 139 ++++++++++++++++++++- tests/system/test_integration_sourcemap.py | 23 ---- 19 files changed, 619 insertions(+), 95 deletions(-) create mode 100644 beater/config/sourcemapping.go create mode 100644 sourcemap/fleet_store.go create mode 100644 sourcemap/fleet_store_test.go diff --git a/_meta/beat.yml b/_meta/beat.yml index 9dee7ba5fb5..bc24d73ffe5 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -263,9 +263,12 @@ apm-server: # Sourcemapping is enabled by default. #enabled: true - # Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration. + # Source maps may be fetched from Elasticsearch by using the + # output.elasticsearch configuration. # A different instance must be configured when using any other output. # This setting only affects sourcemap reads - the output determines where sourcemaps are written. + # Note: Configuring elasticsearch is not supported if apm-server is being + # managed by Fleet. #elasticsearch: # Array of hosts to connect to. # Scheme and port can be left out and will be set to the default (`http` and `9200`). diff --git a/apm-server.docker.yml b/apm-server.docker.yml index 12a6cbf82c6..28f224082a2 100644 --- a/apm-server.docker.yml +++ b/apm-server.docker.yml @@ -263,9 +263,12 @@ apm-server: # Sourcemapping is enabled by default. #enabled: true - # Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration. + # Source maps may be fetched from Elasticsearch by using the + # output.elasticsearch configuration. # A different instance must be configured when using any other output. # This setting only affects sourcemap reads - the output determines where sourcemaps are written. + # Note: Configuring elasticsearch is not supported if apm-server is being + # managed by Fleet. #elasticsearch: # Array of hosts to connect to. # Scheme and port can be left out and will be set to the default (`http` and `9200`). diff --git a/apm-server.yml b/apm-server.yml index 7ac9574daad..f78e05193a0 100644 --- a/apm-server.yml +++ b/apm-server.yml @@ -263,9 +263,12 @@ apm-server: # Sourcemapping is enabled by default. #enabled: true - # Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration. + # Source maps may be fetched from Elasticsearch by using the + # output.elasticsearch configuration. # A different instance must be configured when using any other output. # This setting only affects sourcemap reads - the output determines where sourcemaps are written. + # Note: Configuring elasticsearch is not supported if apm-server is being + # managed by Fleet. #elasticsearch: # Array of hosts to connect to. # Scheme and port can be left out and will be set to the default (`http` and `9200`). diff --git a/apmpackage/apm/manifest.yml b/apmpackage/apm/manifest.yml index bed61c422ee..537277394fa 100644 --- a/apmpackage/apm/manifest.yml +++ b/apmpackage/apm/manifest.yml @@ -119,13 +119,7 @@ policy_templates: description: Number of unique IPs to be cached for the rate limiter. required: false show_user: false - default: 1000 - - name: sourcemap_api_key - type: text - title: RUM - API Key for Sourcemaps - required: false - description: API Key for sourcemap feature. Enter as : - show_user: false + default: 10000 - name: api_key_limit type: integer title: Maximum number of API Keys for Agent authentication diff --git a/beater/beater.go b/beater/beater.go index 0945ed8d549..c45177e05b9 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -20,6 +20,7 @@ package beater import ( "context" "net" + "net/http" "regexp" "runtime" "strings" @@ -27,11 +28,12 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/common/fleetmode" - - "github.com/elastic/beats/v7/libbeat/kibana" + "github.com/elastic/beats/v7/libbeat/common/transport" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/pkg/errors" "go.elastic.co/apm" + "go.elastic.co/apm/module/apmhttp" "golang.org/x/sync/errgroup" "github.com/elastic/beats/v7/libbeat/beat" @@ -256,8 +258,8 @@ func (s *serverCreator) Create(p beat.PipelineConnector, rawConfig *common.Confi sharedServerRunnerParams: s.args, Namespace: namespace, Pipeline: p, - KibanaConfig: &integrationConfig.Fleet.Kibana, RawConfig: apmServerCommonConfig, + FleetConfig: &integrationConfig.Fleet, }) } @@ -279,6 +281,7 @@ type serverRunner struct { acker *waitPublishedAcker namespace string config *config.Config + fleetConfig *config.Fleet beat *beat.Beat logger *logp.Logger tracer *apm.Tracer @@ -289,10 +292,10 @@ type serverRunner struct { type serverRunnerParams struct { sharedServerRunnerParams - Namespace string - Pipeline beat.PipelineConnector - KibanaConfig *kibana.ClientConfig - RawConfig *common.Config + Namespace string + Pipeline beat.PipelineConnector + RawConfig *common.Config + FleetConfig *config.Fleet } type sharedServerRunnerParams struct { @@ -310,10 +313,6 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne return nil, err } - if cfg.DataStreams.Enabled && args.KibanaConfig != nil { - cfg.Kibana.ClientConfig = *args.KibanaConfig - } - runServerContext, cancel := context.WithCancel(ctx) return &serverRunner{ backgroundContext: ctx, @@ -321,6 +320,7 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne cancelRunServerContext: cancel, config: cfg, + fleetConfig: args.FleetConfig, acker: args.Acker, pipeline: args.Pipeline, namespace: args.Namespace, @@ -359,8 +359,7 @@ func (s *serverRunner) Start() { func (s *serverRunner) run() error { // Send config to telemetry. recordAPMServerConfig(s.config) - - transformConfig, err := newTransformConfig(s.beat.Info, s.config) + transformConfig, err := newTransformConfig(s.beat.Info, s.config, s.fleetConfig) if err != nil { return err } @@ -602,7 +601,7 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ } } -func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Config, error) { +func newTransformConfig(beatInfo beat.Info, cfg *config.Config, fleetCfg *config.Fleet) (*transform.Config, error) { transformConfig := &transform.Config{ DataStreams: cfg.DataStreams.Enabled, RUM: transform.RUMConfig{ @@ -611,8 +610,8 @@ func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Conf }, } - if cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled && cfg.RumConfig.SourceMapping.ESConfig != nil { - store, err := newSourcemapStore(beatInfo, cfg.RumConfig.SourceMapping) + if cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled { + store, err := newSourcemapStore(beatInfo, cfg.RumConfig.SourceMapping, fleetCfg) if err != nil { return nil, err } @@ -622,13 +621,44 @@ func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Conf return transformConfig, nil } -func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping) (*sourcemap.Store, error) { - esClient, err := elasticsearch.NewClient(cfg.ESConfig) +func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping, fleetCfg *config.Fleet) (*sourcemap.Store, error) { + if fleetCfg != nil { + var ( + c = *http.DefaultClient + rt = http.DefaultTransport + ) + var tlsConfig *tlscommon.TLSConfig + var err error + if fleetCfg.TLS.IsEnabled() { + if tlsConfig, err = tlscommon.LoadTLSConfig(fleetCfg.TLS); err != nil { + return nil, err + } + } + + // Default for es is 90s :shrug: + timeout := 30 * time.Second + dialer := transport.NetDialer(timeout) + tlsDialer, err := transport.TLSDialer(dialer, tlsConfig, timeout) + if err != nil { + return nil, err + } + + rt = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: dialer.Dial, + DialTLS: tlsDialer.Dial, + TLSClientConfig: tlsConfig.ToConfig(), + } + + c.Transport = apmhttp.WrapRoundTripper(rt) + return sourcemap.NewFleetStore(&c, fleetCfg, cfg.Metadata, cfg.Cache.Expiration) + } + c, err := elasticsearch.NewClient(cfg.ESConfig) if err != nil { return nil, err } index := strings.ReplaceAll(cfg.IndexPattern, "%{[observer.version]}", beatInfo.Version) - return sourcemap.NewStore(esClient, index, cfg.Cache.Expiration) + return sourcemap.NewElasticsearchStore(c, index, cfg.Cache.Expiration) } // WrapRunServerWithProcessors wraps runServer such that it wraps args.Reporter diff --git a/beater/beater_test.go b/beater/beater_test.go index b2123af23f2..b7295aac0c0 100644 --- a/beater/beater_test.go +++ b/beater/beater_test.go @@ -18,6 +18,7 @@ package beater import ( + "compress/zlib" "context" "errors" "net" @@ -36,7 +37,9 @@ import ( "go.uber.org/zap/zaptest/observer" "github.com/elastic/apm-server/beater/config" + "github.com/elastic/apm-server/elasticsearch" "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/sourcemap/test" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/instrumentation" @@ -246,7 +249,7 @@ func TestTransformConfigIndex(t *testing.T) { cfg.RumConfig.SourceMapping.IndexPattern = indexPattern } - transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg) + transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil) require.NoError(t, err) require.NotNil(t, transformConfig.RUM.SourcemapStore) transformConfig.RUM.SourcemapStore.Added(context.Background(), "name", "version", "path") @@ -266,7 +269,7 @@ func TestTransformConfig(t *testing.T) { cfg := config.DefaultConfig() cfg.RumConfig.Enabled = rumEnabled cfg.RumConfig.SourceMapping.Enabled = sourcemapEnabled - transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg) + transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil) require.NoError(t, err) if expectSourcemapStore { assert.NotNil(t, transformConfig.RUM.SourcemapStore) @@ -280,3 +283,64 @@ func TestTransformConfig(t *testing.T) { test(true, false, false) test(true, true, true) } + +func TestStoreUsesRUMElasticsearchConfig(t *testing.T) { + var called bool + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.Write([]byte(test.ValidSourcemap)) + })) + defer ts.Close() + + cfg := config.DefaultConfig() + cfg.RumConfig.Enabled = true + cfg.RumConfig.SourceMapping.Enabled = true + cfg.RumConfig.SourceMapping.ESConfig = elasticsearch.DefaultConfig() + cfg.RumConfig.SourceMapping.ESConfig.Hosts = []string{ts.URL} + + transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil) + require.NoError(t, err) + // Check that the provided rum elasticsearch config was used and + // Fetch() goes to the test server. + _, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path") + require.NoError(t, err) + + assert.True(t, called) +} + +func TestFleetStoreUsed(t *testing.T) { + var called bool + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + wr := zlib.NewWriter(w) + defer wr.Close() + wr.Write([]byte(test.ValidSourcemap)) + })) + defer ts.Close() + + cfg := config.DefaultConfig() + cfg.RumConfig.Enabled = true + cfg.RumConfig.SourceMapping.Enabled = true + cfg.RumConfig.SourceMapping.Metadata = []config.SourceMapMetadata{{ + ServiceName: "app", + ServiceVersion: "1.0", + BundleFilepath: "/bundle/path", + SourceMapURL: "/my/path", + }} + + fleetCfg := &config.Fleet{ + Hosts: []string{ts.URL[7:]}, + Protocol: "http", + AccessAPIKey: "my-key", + TLS: nil, + } + + transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, fleetCfg) + require.NoError(t, err) + // Check that the provided rum elasticsearch config was used and + // Fetch() goes to the test server. + _, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path") + require.NoError(t, err) + + assert.True(t, called) +} diff --git a/beater/config/config_test.go b/beater/config/config_test.go index 9b391b9cc2e..32e68dd031c 100644 --- a/beater/config/config_test.go +++ b/beater/config/config_test.go @@ -209,6 +209,7 @@ func TestUnpackConfig(t *testing.T) { MaxRetries: 3, Backoff: elasticsearch.DefaultBackoffConfig, }, + Metadata: []SourceMapMetadata{}, esConfigured: true, }, LibraryPattern: "^custom", @@ -295,6 +296,14 @@ func TestUnpackConfig(t *testing.T) { "rum": map[string]interface{}{ "enabled": true, "source_mapping": map[string]interface{}{ + "metadata": []map[string]string{ + { + "service.name": "opbeans-rum", + "service.version": "1.2.3", + "bundle.filepath": "/test/e2e/general-usecase/bundle.js.map", + "sourcemap.url": "http://somewhere.com/bundle.js.map", + }, + }, "cache": map[string]interface{}{ "expiration": 7, }, @@ -376,6 +385,14 @@ func TestUnpackConfig(t *testing.T) { }, IndexPattern: "apm-*-sourcemap*", ESConfig: elasticsearch.DefaultConfig(), + Metadata: []SourceMapMetadata{ + { + ServiceName: "opbeans-rum", + ServiceVersion: "1.2.3", + BundleFilepath: "/test/e2e/general-usecase/bundle.js.map", + SourceMapURL: "http://somewhere.com/bundle.js.map", + }, + }, }, LibraryPattern: "rum", ExcludeFromGrouping: "^/webpack", diff --git a/beater/config/integration.go b/beater/config/integration.go index 99c5c321e15..9c6c61d509f 100644 --- a/beater/config/integration.go +++ b/beater/config/integration.go @@ -21,7 +21,7 @@ import ( "errors" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/kibana" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) func NewIntegrationConfig(rootConfig *common.Config) (*IntegrationConfig, error) { @@ -66,5 +66,8 @@ type Package struct { } type Fleet struct { - Kibana kibana.ClientConfig `config:"kibana"` + Hosts []string `config:"hosts"` + Protocol string `config:"protocol"` + AccessAPIKey string `config:"access_api_key"` + TLS *tlscommon.Config `config:"ssl"` } diff --git a/beater/config/rum.go b/beater/config/rum.go index 000280d9c3a..1e61e98b344 100644 --- a/beater/config/rum.go +++ b/beater/config/rum.go @@ -58,12 +58,13 @@ type EventRate struct { LruSize int `config:"lru_size"` } -// SourceMapping holds sourecemap config information +// SourceMapping holds sourcemap config information type SourceMapping struct { Cache Cache `config:"cache"` Enabled bool `config:"enabled"` IndexPattern string `config:"index_pattern"` ESConfig *elasticsearch.Config `config:"elasticsearch"` + Metadata []SourceMapMetadata `config:"metadata"` esConfigured bool } @@ -79,14 +80,13 @@ func (c *RumConfig) setup(log *logp.Logger, dataStreamsEnabled bool, outputESCfg return errors.Wrapf(err, "Invalid regex for `exclude_from_grouping`: ") } - var apiKey string - if c.SourceMapping.esConfigured { - if dataStreamsEnabled { - // when running under Fleet, the only setting configured is the api key - apiKey = c.SourceMapping.ESConfig.APIKey - } else { - return nil - } + if c.SourceMapping.esConfigured && len(c.SourceMapping.Metadata) > 0 { + return errors.New("configuring both source_mapping.elasticsearch and sourcemapping.source_maps not allowed") + } + + // No need to unpack the ESConfig if SourceMapMetadata exist + if len(c.SourceMapping.Metadata) > 0 { + return nil } // fall back to elasticsearch output configuration for sourcemap storage if possible @@ -98,9 +98,6 @@ func (c *RumConfig) setup(log *logp.Logger, dataStreamsEnabled bool, outputESCfg if err := outputESCfg.Unpack(c.SourceMapping.ESConfig); err != nil { return errors.Wrap(err, "unpacking Elasticsearch config into Sourcemap config") } - if c.SourceMapping.ESConfig.APIKey == "" { - c.SourceMapping.ESConfig.APIKey = apiKey - } return nil } @@ -119,6 +116,7 @@ func defaultSourcemapping() SourceMapping { Cache: Cache{Expiration: defaultSourcemapCacheExpiration}, IndexPattern: defaultSourcemapIndexPattern, ESConfig: elasticsearch.DefaultConfig(), + Metadata: []SourceMapMetadata{}, } } diff --git a/beater/config/sourcemapping.go b/beater/config/sourcemapping.go new file mode 100644 index 00000000000..ab98cf44426 --- /dev/null +++ b/beater/config/sourcemapping.go @@ -0,0 +1,26 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +// SourceMapMetadata holds source map configuration information. +type SourceMapMetadata struct { + ServiceName string `config:"service.name"` + ServiceVersion string `config:"service.version"` + BundleFilepath string `config:"bundle.filepath"` + SourceMapURL string `config:"sourcemap.url"` +} diff --git a/model/error_test.go b/model/error_test.go index 9bd019d9a24..781b20f79f1 100644 --- a/model/error_test.go +++ b/model/error_test.go @@ -805,7 +805,7 @@ func TestSourcemapping(t *testing.T) { assert.Equal(t, 1, *event.Exception.Stacktrace[0].Lineno) // transform with sourcemap store - store, err := sourcemap.NewStore(test.ESClientWithValidSourcemap(t), "apm-*sourcemap*", time.Minute) + store, err := sourcemap.NewElasticsearchStore(test.ESClientWithValidSourcemap(t), "apm-*sourcemap*", time.Minute) require.NoError(t, err) transformedWithSourcemap := event.fields(context.Background(), &transform.Config{ RUM: transform.RUMConfig{SourcemapStore: store}, diff --git a/model/sourcemap_test.go b/model/sourcemap_test.go index 2b9e6d20414..6ddec9ec273 100644 --- a/model/sourcemap_test.go +++ b/model/sourcemap_test.go @@ -86,7 +86,7 @@ func TestInvalidateCache(t *testing.T) { // create sourcemap store client, err := estest.NewElasticsearchClient(estest.NewTransport(t, http.StatusOK, nil)) require.NoError(t, err) - store, err := sourcemap.NewStore(client, "foo", time.Minute) + store, err := sourcemap.NewElasticsearchStore(client, "foo", time.Minute) require.NoError(t, err) // transform with sourcemap store diff --git a/model/stacktrace_frame_test.go b/model/stacktrace_frame_test.go index b3f00cd8e56..8c890b8afbb 100644 --- a/model/stacktrace_frame_test.go +++ b/model/stacktrace_frame_test.go @@ -422,7 +422,7 @@ func TestLibraryFrame(t *testing.T) { } func testSourcemapStore(t *testing.T, client elasticsearch.Client) *sourcemap.Store { - store, err := sourcemap.NewStore(client, "apm-*sourcemap*", time.Minute) + store, err := sourcemap.NewElasticsearchStore(client, "apm-*sourcemap*", time.Minute) require.NoError(t, err) return store } diff --git a/sourcemap/es_store.go b/sourcemap/es_store.go index 84afd0ca7c8..ad999f4d17e 100644 --- a/sourcemap/es_store.go +++ b/sourcemap/es_store.go @@ -25,12 +25,14 @@ import ( "io" "io/ioutil" "net/http" + "time" "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/apm-server/elasticsearch" + logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/utility" ) @@ -65,6 +67,19 @@ type esSourcemapResponse struct { } `json:"hits"` } +// NewElasticsearchStore returns an instance of Store for interacting with +// sourcemaps stored in ElasticSearch. +func NewElasticsearchStore( + c elasticsearch.Client, + index string, + expiration time.Duration, +) (*Store, error) { + logger := logp.NewLogger(logs.Sourcemap) + s := &esStore{c, index, logger} + + return newStore(s, logger, expiration) +} + func (s *esStore) fetch(ctx context.Context, name, version, path string) (string, error) { statusCode, body, err := s.runSearchQuery(ctx, name, version, path) if err != nil { diff --git a/sourcemap/fleet_store.go b/sourcemap/fleet_store.go new file mode 100644 index 00000000000..f2108435952 --- /dev/null +++ b/sourcemap/fleet_store.go @@ -0,0 +1,139 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sourcemap + +import ( + "bytes" + "compress/zlib" + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/elastic/apm-server/beater/config" + logs "github.com/elastic/apm-server/log" +) + +type fleetStore struct { + apikey string + c *http.Client + fleetURLs map[key]string +} + +type key struct { + ServiceName string + ServiceVersion string + BundleFilepath string +} + +// NewFleetStore returns an instance of Store for interacting with sourcemaps +// stored in Fleet-Server. +func NewFleetStore( + c *http.Client, + fleetCfg *config.Fleet, + cfgs []config.SourceMapMetadata, + expiration time.Duration, +) (*Store, error) { + if len(fleetCfg.Hosts) < 1 { + return nil, errors.New("no fleet hosts present for fleet store") + } + logger := logp.NewLogger(logs.Sourcemap) + s, err := newFleetStore(c, fleetCfg, cfgs) + if err != nil { + return nil, err + } + return newStore(s, logger, expiration) +} + +func newFleetStore( + c *http.Client, + fleetCfg *config.Fleet, + cfgs []config.SourceMapMetadata, +) (fleetStore, error) { + // TODO(stn): Add support for multiple fleet hosts + // cf. https://github.com/elastic/apm-server/issues/5514 + host := fleetCfg.Hosts[0] + fleetURLs := make(map[key]string) + + for _, cfg := range cfgs { + k := key{cfg.ServiceName, cfg.ServiceVersion, cfg.BundleFilepath} + u, err := common.MakeURL(fleetCfg.Protocol, cfg.SourceMapURL, host, 8220) + if err != nil { + return fleetStore{}, err + } + fleetURLs[k] = u + } + return fleetStore{ + apikey: "ApiKey " + fleetCfg.AccessAPIKey, + fleetURLs: fleetURLs, + c: c, + }, nil +} + +func (f fleetStore) fetch(ctx context.Context, name, version, path string) (string, error) { + k := key{name, version, path} + fleetURL, ok := f.fleetURLs[k] + if !ok { + return "", fmt.Errorf("unable to find sourcemap.url for service.name=%s service.version=%s bundle.path=%s", + name, version, path, + ) + } + + req, err := http.NewRequest(http.MethodGet, fleetURL, nil) + if err != nil { + return "", err + } + req.Header.Add("Authorization", f.apikey) + + resp, err := f.c.Do(req.WithContext(ctx)) + if err != nil { + return "", err + } + defer resp.Body.Close() + + // Verify that we should only get 200 back from fleet-server + if resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("failure querying fleet: statuscode=%d response=(failed to read body)", resp.StatusCode) + } + return "", fmt.Errorf("failure querying fleet: statuscode=%d response=%s", resp.StatusCode, body) + } + + // Looking at the index in elasticsearch, currently + // - no encryption + // - zlib compression + r, err := zlib.NewReader(resp.Body) + if err != nil { + return "", err + } + + buf := new(bytes.Buffer) + if _, err := io.Copy(buf, r); err != nil { + return "", err + } + + return buf.String(), nil +} diff --git a/sourcemap/fleet_store_test.go b/sourcemap/fleet_store_test.go new file mode 100644 index 00000000000..66b36340e96 --- /dev/null +++ b/sourcemap/fleet_store_test.go @@ -0,0 +1,79 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sourcemap + +import ( + "compress/zlib" + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/elastic/apm-server/beater/config" + + "github.com/stretchr/testify/assert" +) + +func TestFleetFetch(t *testing.T) { + var ( + hasAuth bool + apikey = "supersecret" + name = "webapp" + version = "1.0.0" + path = "/my/path/to/bundle.js.map" + wantRes = "sourcemap response" + c = http.DefaultClient + sourceMapPath = "/api/fleet/artifact" + ) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, sourceMapPath, r.URL.Path) + auth := r.Header.Get("Authorization") + hasAuth = auth == "ApiKey "+apikey + // zlib compress + wr := zlib.NewWriter(w) + defer wr.Close() + wr.Write([]byte(wantRes)) + })) + defer ts.Close() + + fleetCfg := &config.Fleet{ + Hosts: []string{ts.URL[7:]}, + Protocol: "http", + AccessAPIKey: apikey, + TLS: nil, + } + + cfgs := []config.SourceMapMetadata{ + { + ServiceName: name, + ServiceVersion: version, + BundleFilepath: path, + SourceMapURL: sourceMapPath, + }, + } + fb, err := newFleetStore(c, fleetCfg, cfgs) + assert.NoError(t, err) + + gotRes, err := fb.fetch(context.Background(), name, version, path) + assert.NoError(t, err) + + assert.Equal(t, wantRes, gotRes) + + assert.True(t, hasAuth) +} diff --git a/sourcemap/store.go b/sourcemap/store.go index 462ffedd3d4..aec47b54833 100644 --- a/sourcemap/store.go +++ b/sourcemap/store.go @@ -21,17 +21,14 @@ import ( "context" "math" "strings" + "sync" "time" - "github.com/elastic/apm-server/elasticsearch" - "github.com/go-sourcemap/sourcemap" gocache "github.com/patrickmn/go-cache" "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/logp" - - logs "github.com/elastic/apm-server/log" ) const ( @@ -42,30 +39,41 @@ var ( errInit = errors.New("Cache cannot be initialized. Expiration and CleanupInterval need to be >= 0") ) -// Store holds information necessary to fetch a sourcemap, either from an Elasticsearch instance or an internal cache. +// Store holds information necessary to fetch a sourcemap, either from an +// Elasticsearch instance or an internal cache. type Store struct { cache *gocache.Cache - esStore *esStore + backend backend logger *logp.Logger + + mu sync.Mutex + inflight map[string]chan struct{} } -// NewStore creates a new instance for fetching sourcemaps. The client and index parameters are needed to be able to -// fetch sourcemaps from Elasticsearch. The expiration time is used for the internal cache. -func NewStore(client elasticsearch.Client, index string, expiration time.Duration) (*Store, error) { - if expiration < 0 { +type backend interface { + fetch(ctx context.Context, name, version, path string) (string, error) +} + +func newStore( + b backend, + logger *logp.Logger, + cacheExpiration time.Duration, +) (*Store, error) { + if cacheExpiration < 0 { return nil, errInit } - logger := logp.NewLogger(logs.Sourcemap) + return &Store{ - cache: gocache.New(expiration, cleanupInterval(expiration)), - esStore: &esStore{client: client, index: index, logger: logger}, - logger: logger, + cache: gocache.New(cacheExpiration, cleanupInterval(cacheExpiration)), + backend: b, + logger: logger, + inflight: make(map[string]chan struct{}), }, nil } // Fetch a sourcemap from the store. -func (s *Store) Fetch(ctx context.Context, name string, version string, path string) (*sourcemap.Consumer, error) { - key := key([]string{name, version, path}) +func (s *Store) Fetch(ctx context.Context, name, version, path string) (*sourcemap.Consumer, error) { + key := cacheKey([]string{name, version, path}) // fetch from cache if val, found := s.cache.Get(key); found { @@ -73,10 +81,43 @@ func (s *Store) Fetch(ctx context.Context, name string, version string, path str return consumer, nil } + // if the value hasn't been found, check to see if there's an inflight + // request to update the value. + s.mu.Lock() + wait, ok := s.inflight[key] + if ok { + // found an inflight request, wait for it to complete. + s.mu.Unlock() + + select { + case <-wait: + case <-ctx.Done(): + return nil, ctx.Err() + } + // Try to read the value again + return s.Fetch(ctx, name, version, path) + } + + // no inflight request found, add a channel to the map and then + // make the fetch request. + wait = make(chan struct{}) + s.inflight[key] = wait + + s.mu.Unlock() + + // Once the fetch request is complete, close and remove the channel + // from the syncronization map. + defer func() { + s.mu.Lock() + delete(s.inflight, key) + close(wait) + s.mu.Unlock() + }() + // fetch from Elasticsearch and ensure caching for all non-temporary results - sourcemapStr, err := s.esStore.fetch(ctx, name, version, path) + sourcemapStr, err := s.backend.fetch(ctx, name, version, path) if err != nil { - if !strings.Contains(err.Error(), errMsgESFailure) { + if !strings.Contains(err.Error(), "failure querying") { s.add(key, nil) } return nil, err @@ -93,6 +134,7 @@ func (s *Store) Fetch(ctx context.Context, name string, version string, path str return nil, errors.Wrap(err, errMsgParseSourcemap) } s.add(key, consumer) + return consumer, nil } @@ -102,7 +144,7 @@ func (s *Store) Added(ctx context.Context, name string, version string, path str s.logger.Warnf("Overriding sourcemap for service %s version %s and file %s", name, version, path) } - key := key([]string{name, version, path}) + key := cacheKey([]string{name, version, path}) s.cache.Delete(key) if !s.logger.IsDebug() { return @@ -118,7 +160,7 @@ func (s *Store) add(key string, consumer *sourcemap.Consumer) { s.logger.Debugf("Added id %v. Cache now has %v entries.", key, s.cache.ItemCount()) } -func key(s []string) string { +func cacheKey(s []string) string { return strings.Join(s, "_") } diff --git a/sourcemap/store_test.go b/sourcemap/store_test.go index 6c18722166e..ed2782d4438 100644 --- a/sourcemap/store_test.go +++ b/sourcemap/store_test.go @@ -18,8 +18,14 @@ package sourcemap import ( + "compress/zlib" "context" + "errors" "fmt" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" "testing" "time" @@ -28,16 +34,21 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/elasticsearch" + logs "github.com/elastic/apm-server/log" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/apm-server/sourcemap/test" ) -func Test_NewStore(t *testing.T) { - _, err := NewStore(nil, "", -1) +func Test_newStore(t *testing.T) { + logger := logp.NewLogger(logs.Sourcemap) + + _, err := newStore(nil, logger, -1) require.Error(t, err) - f, err := NewStore(nil, "", 100) + f, err := newStore(nil, logger, 100) require.NoError(t, err) assert.NotNil(t, f.cache) } @@ -142,6 +153,126 @@ func TestStore_Fetch(t *testing.T) { }) } +func TestFetchTimeout(t *testing.T) { + var ( + errs int64 + + apikey = "supersecret" + name = "webapp" + version = "1.0.0" + path = "/my/path/to/bundle.js.map" + c = http.DefaultClient + ) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-r.Context().Done() + })) + defer ts.Close() + + fleetCfg := &config.Fleet{ + Hosts: []string{ts.URL}, + Protocol: "https", + AccessAPIKey: apikey, + TLS: nil, + } + cfgs := []config.SourceMapMetadata{ + { + ServiceName: name, + ServiceVersion: version, + BundleFilepath: path, + SourceMapURL: "", + }, + } + b, err := newFleetStore(c, fleetCfg, cfgs) + assert.NoError(t, err) + logger := logp.NewLogger(logs.Sourcemap) + store, err := newStore(b, logger, time.Minute) + assert.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + + _, err = store.Fetch(ctx, name, version, path) + assert.True(t, errors.Is(err, context.DeadlineExceeded)) + atomic.AddInt64(&errs, 1) + + assert.Equal(t, int64(1), errs) +} + +func TestConcurrentFetch(t *testing.T) { + for _, tc := range []struct { + calledWant, errWant, succsWant int64 + }{ + {calledWant: 1, errWant: 0, succsWant: 10}, + {calledWant: 2, errWant: 1, succsWant: 9}, + {calledWant: 4, errWant: 3, succsWant: 7}, + } { + var ( + called, errs, succs int64 + + apikey = "supersecret" + name = "webapp" + version = "1.0.0" + path = "/my/path/to/bundle.js.map" + c = http.DefaultClient + + errsLeft = tc.errWant + ) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt64(&called, 1) + // Simulate the wait for a network request. + time.Sleep(50 * time.Millisecond) + if errsLeft > 0 { + errsLeft-- + http.Error(w, "err", http.StatusInternalServerError) + return + } + wr := zlib.NewWriter(w) + defer wr.Close() + wr.Write([]byte(test.ValidSourcemap)) + })) + defer ts.Close() + + fleetCfg := &config.Fleet{ + Hosts: []string{ts.URL}, + Protocol: "https", + AccessAPIKey: apikey, + TLS: nil, + } + cfgs := []config.SourceMapMetadata{ + { + ServiceName: name, + ServiceVersion: version, + BundleFilepath: path, + SourceMapURL: "", + }, + } + store, err := NewFleetStore(c, fleetCfg, cfgs, time.Minute) + assert.NoError(t, err) + + var wg sync.WaitGroup + for i := 0; i < int(tc.succsWant+tc.errWant); i++ { + wg.Add(1) + go func() { + consumer, err := store.Fetch(context.Background(), name, version, path) + if err != nil { + atomic.AddInt64(&errs, 1) + } else { + assert.NotNil(t, consumer) + atomic.AddInt64(&succs, 1) + } + + wg.Done() + }() + } + + wg.Wait() + assert.Equal(t, tc.errWant, errs) + assert.Equal(t, tc.calledWant, called) + assert.Equal(t, tc.succsWant, succs) + } +} + func TestStore_Added(t *testing.T) { name, version, path := "foo", "1.0.1", "/tmp" key := "foo_1.0.1_/tmp" @@ -202,7 +333,7 @@ func TestCleanupInterval(t *testing.T) { } func testStore(t *testing.T, client elasticsearch.Client) *Store { - store, err := NewStore(client, "apm-*sourcemap*", time.Minute) + store, err := NewElasticsearchStore(client, "apm-*sourcemap*", time.Minute) require.NoError(t, err) return store } diff --git a/tests/system/test_integration_sourcemap.py b/tests/system/test_integration_sourcemap.py index fef3f21bb64..97534fa2a6a 100644 --- a/tests/system/test_integration_sourcemap.py +++ b/tests/system/test_integration_sourcemap.py @@ -208,29 +208,6 @@ def test_rum_transaction(self): assert frames_checked > 0, "no frames checked" -@integration_test -class SourcemapInvalidESConfig(BaseSourcemapTest): - def config(self): - cfg = super(SourcemapInvalidESConfig, self).config() - url = self.split_url(cfg) - cfg.update({ - "smap_es_host": url["host"], - "smap_es_username": url["username"], - "smap_es_password": "xxxx", - }) - return cfg - - def test_unauthorized(self): - # successful - uses output.elasticsearch.* configuration - self.upload_sourcemap() - # unauthorized - uses apm-server.rum.sourcemapping.elasticsearch configuration - self.load_docs_with_template(self.get_error_payload_path(), - self.intake_url, - 'error', - 1) - assert self.log_contains("unable to authenticate user") - - @integration_test class SourcemapESConfigUser(BaseSourcemapTest): def config(self):