Skip to content

Commit

Permalink
Service specific source maps (#5410)
Browse files Browse the repository at this point in the history
Serve service-specific sourcemaps from fleet-server
  • Loading branch information
stuartnelson3 authored Jun 23, 2021
1 parent 97a6f9b commit 9453fe2
Show file tree
Hide file tree
Showing 20 changed files with 619 additions and 94 deletions.
5 changes: 4 additions & 1 deletion _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ apm-server:
# Sourcemapping is enabled by default.
#enabled: true

# Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration.
# Source maps may be fetched from Elasticsearch by using the
# output.elasticsearch configuration.
# A different instance must be configured when using any other output.
# This setting only affects sourcemap reads - the output determines where sourcemaps are written.
# Note: Configuring elasticsearch is not supported if apm-server is being
# managed by Fleet.
#elasticsearch:
# Array of hosts to connect to.
# Scheme and port can be left out and will be set to the default (`http` and `9200`).
Expand Down
5 changes: 4 additions & 1 deletion apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ apm-server:
# Sourcemapping is enabled by default.
#enabled: true

# Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration.
# Source maps may be fetched from Elasticsearch by using the
# output.elasticsearch configuration.
# A different instance must be configured when using any other output.
# This setting only affects sourcemap reads - the output determines where sourcemaps are written.
# Note: Configuring elasticsearch is not supported if apm-server is being
# managed by Fleet.
#elasticsearch:
# Array of hosts to connect to.
# Scheme and port can be left out and will be set to the default (`http` and `9200`).
Expand Down
5 changes: 4 additions & 1 deletion apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ apm-server:
# Sourcemapping is enabled by default.
#enabled: true

# Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration.
# Source maps may be fetched from Elasticsearch by using the
# output.elasticsearch configuration.
# A different instance must be configured when using any other output.
# This setting only affects sourcemap reads - the output determines where sourcemaps are written.
# Note: Configuring elasticsearch is not supported if apm-server is being
# managed by Fleet.
#elasticsearch:
# Array of hosts to connect to.
# Scheme and port can be left out and will be set to the default (`http` and `9200`).
Expand Down
6 changes: 0 additions & 6 deletions apmpackage/apm/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,6 @@ policy_templates:
required: false
show_user: false
default: 10000
- name: sourcemap_api_key
type: text
title: RUM - API Key for Sourcemaps
required: false
description: API Key for sourcemap feature. Enter as <Id>:<API Key>
show_user: false
- name: api_key_limit
type: integer
title: Maximum number of API Keys for Agent authentication
Expand Down
68 changes: 49 additions & 19 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ package beater
import (
"context"
"net"
"net/http"
"regexp"
"runtime"
"strings"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common/fleetmode"

"github.com/elastic/beats/v7/libbeat/kibana"
"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"

"github.com/pkg/errors"
"go.elastic.co/apm"
"go.elastic.co/apm/module/apmhttp"
"golang.org/x/sync/errgroup"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -256,8 +258,8 @@ func (s *serverCreator) Create(p beat.PipelineConnector, rawConfig *common.Confi
sharedServerRunnerParams: s.args,
Namespace: namespace,
Pipeline: p,
KibanaConfig: &integrationConfig.Fleet.Kibana,
RawConfig: apmServerCommonConfig,
FleetConfig: &integrationConfig.Fleet,
})
}

Expand All @@ -279,6 +281,7 @@ type serverRunner struct {
acker *waitPublishedAcker
namespace string
config *config.Config
fleetConfig *config.Fleet
beat *beat.Beat
logger *logp.Logger
tracer *apm.Tracer
Expand All @@ -289,10 +292,10 @@ type serverRunner struct {
type serverRunnerParams struct {
sharedServerRunnerParams

Namespace string
Pipeline beat.PipelineConnector
KibanaConfig *kibana.ClientConfig
RawConfig *common.Config
Namespace string
Pipeline beat.PipelineConnector
RawConfig *common.Config
FleetConfig *config.Fleet
}

type sharedServerRunnerParams struct {
Expand All @@ -310,17 +313,14 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne
return nil, err
}

if cfg.DataStreams.Enabled && args.KibanaConfig != nil {
cfg.Kibana.ClientConfig = *args.KibanaConfig
}

runServerContext, cancel := context.WithCancel(ctx)
return &serverRunner{
backgroundContext: ctx,
runServerContext: runServerContext,
cancelRunServerContext: cancel,

config: cfg,
fleetConfig: args.FleetConfig,
acker: args.Acker,
pipeline: args.Pipeline,
namespace: args.Namespace,
Expand Down Expand Up @@ -359,8 +359,7 @@ func (s *serverRunner) Start() {
func (s *serverRunner) run() error {
// Send config to telemetry.
recordAPMServerConfig(s.config)

transformConfig, err := newTransformConfig(s.beat.Info, s.config)
transformConfig, err := newTransformConfig(s.beat.Info, s.config, s.fleetConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -602,7 +601,7 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ
}
}

func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Config, error) {
func newTransformConfig(beatInfo beat.Info, cfg *config.Config, fleetCfg *config.Fleet) (*transform.Config, error) {
transformConfig := &transform.Config{
DataStreams: cfg.DataStreams.Enabled,
RUM: transform.RUMConfig{
Expand All @@ -611,8 +610,8 @@ func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Conf
},
}

if cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled && cfg.RumConfig.SourceMapping.ESConfig != nil {
store, err := newSourcemapStore(beatInfo, cfg.RumConfig.SourceMapping)
if cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled {
store, err := newSourcemapStore(beatInfo, cfg.RumConfig.SourceMapping, fleetCfg)
if err != nil {
return nil, err
}
Expand All @@ -622,13 +621,44 @@ func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Conf
return transformConfig, nil
}

func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping) (*sourcemap.Store, error) {
esClient, err := elasticsearch.NewClient(cfg.ESConfig)
func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping, fleetCfg *config.Fleet) (*sourcemap.Store, error) {
if fleetCfg != nil {
var (
c = *http.DefaultClient
rt = http.DefaultTransport
)
var tlsConfig *tlscommon.TLSConfig
var err error
if fleetCfg.TLS.IsEnabled() {
if tlsConfig, err = tlscommon.LoadTLSConfig(fleetCfg.TLS); err != nil {
return nil, err
}
}

// Default for es is 90s :shrug:
timeout := 30 * time.Second
dialer := transport.NetDialer(timeout)
tlsDialer, err := transport.TLSDialer(dialer, tlsConfig, timeout)
if err != nil {
return nil, err
}

rt = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: dialer.Dial,
DialTLS: tlsDialer.Dial,
TLSClientConfig: tlsConfig.ToConfig(),
}

c.Transport = apmhttp.WrapRoundTripper(rt)
return sourcemap.NewFleetStore(&c, fleetCfg, cfg.Metadata, cfg.Cache.Expiration)
}
c, err := elasticsearch.NewClient(cfg.ESConfig)
if err != nil {
return nil, err
}
index := strings.ReplaceAll(cfg.IndexPattern, "%{[observer.version]}", beatInfo.Version)
return sourcemap.NewStore(esClient, index, cfg.Cache.Expiration)
return sourcemap.NewElasticsearchStore(c, index, cfg.Cache.Expiration)
}

// WrapRunServerWithProcessors wraps runServer such that it wraps args.Reporter
Expand Down
68 changes: 66 additions & 2 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package beater

import (
"compress/zlib"
"context"
"errors"
"net"
Expand All @@ -36,7 +37,9 @@ import (
"go.uber.org/zap/zaptest/observer"

"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/sourcemap/test"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/instrumentation"
Expand Down Expand Up @@ -246,7 +249,7 @@ func TestTransformConfigIndex(t *testing.T) {
cfg.RumConfig.SourceMapping.IndexPattern = indexPattern
}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg)
transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
require.NoError(t, err)
require.NotNil(t, transformConfig.RUM.SourcemapStore)
transformConfig.RUM.SourcemapStore.Added(context.Background(), "name", "version", "path")
Expand All @@ -266,7 +269,7 @@ func TestTransformConfig(t *testing.T) {
cfg := config.DefaultConfig()
cfg.RumConfig.Enabled = rumEnabled
cfg.RumConfig.SourceMapping.Enabled = sourcemapEnabled
transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg)
transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
require.NoError(t, err)
if expectSourcemapStore {
assert.NotNil(t, transformConfig.RUM.SourcemapStore)
Expand All @@ -280,3 +283,64 @@ func TestTransformConfig(t *testing.T) {
test(true, false, false)
test(true, true, true)
}

func TestStoreUsesRUMElasticsearchConfig(t *testing.T) {
var called bool
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
w.Write([]byte(test.ValidSourcemap))
}))
defer ts.Close()

cfg := config.DefaultConfig()
cfg.RumConfig.Enabled = true
cfg.RumConfig.SourceMapping.Enabled = true
cfg.RumConfig.SourceMapping.ESConfig = elasticsearch.DefaultConfig()
cfg.RumConfig.SourceMapping.ESConfig.Hosts = []string{ts.URL}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
require.NoError(t, err)
// Check that the provided rum elasticsearch config was used and
// Fetch() goes to the test server.
_, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path")
require.NoError(t, err)

assert.True(t, called)
}

func TestFleetStoreUsed(t *testing.T) {
var called bool
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
wr := zlib.NewWriter(w)
defer wr.Close()
wr.Write([]byte(test.ValidSourcemap))
}))
defer ts.Close()

cfg := config.DefaultConfig()
cfg.RumConfig.Enabled = true
cfg.RumConfig.SourceMapping.Enabled = true
cfg.RumConfig.SourceMapping.Metadata = []config.SourceMapMetadata{{
ServiceName: "app",
ServiceVersion: "1.0",
BundleFilepath: "/bundle/path",
SourceMapURL: "/my/path",
}}

fleetCfg := &config.Fleet{
Hosts: []string{ts.URL[7:]},
Protocol: "http",
AccessAPIKey: "my-key",
TLS: nil,
}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, fleetCfg)
require.NoError(t, err)
// Check that the provided rum elasticsearch config was used and
// Fetch() goes to the test server.
_, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path")
require.NoError(t, err)

assert.True(t, called)
}
17 changes: 17 additions & 0 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func TestUnpackConfig(t *testing.T) {
MaxRetries: 3,
Backoff: elasticsearch.DefaultBackoffConfig,
},
Metadata: []SourceMapMetadata{},
esConfigured: true,
},
LibraryPattern: "^custom",
Expand Down Expand Up @@ -295,6 +296,14 @@ func TestUnpackConfig(t *testing.T) {
"rum": map[string]interface{}{
"enabled": true,
"source_mapping": map[string]interface{}{
"metadata": []map[string]string{
{
"service.name": "opbeans-rum",
"service.version": "1.2.3",
"bundle.filepath": "/test/e2e/general-usecase/bundle.js.map",
"sourcemap.url": "http://somewhere.com/bundle.js.map",
},
},
"cache": map[string]interface{}{
"expiration": 7,
},
Expand Down Expand Up @@ -376,6 +385,14 @@ func TestUnpackConfig(t *testing.T) {
},
IndexPattern: "apm-*-sourcemap*",
ESConfig: elasticsearch.DefaultConfig(),
Metadata: []SourceMapMetadata{
{
ServiceName: "opbeans-rum",
ServiceVersion: "1.2.3",
BundleFilepath: "/test/e2e/general-usecase/bundle.js.map",
SourceMapURL: "http://somewhere.com/bundle.js.map",
},
},
},
LibraryPattern: "rum",
ExcludeFromGrouping: "^/webpack",
Expand Down
7 changes: 5 additions & 2 deletions beater/config/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/kibana"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)

func NewIntegrationConfig(rootConfig *common.Config) (*IntegrationConfig, error) {
Expand Down Expand Up @@ -66,5 +66,8 @@ type Package struct {
}

type Fleet struct {
Kibana kibana.ClientConfig `config:"kibana"`
Hosts []string `config:"hosts"`
Protocol string `config:"protocol"`
AccessAPIKey string `config:"access_api_key"`
TLS *tlscommon.Config `config:"ssl"`
}
Loading

0 comments on commit 9453fe2

Please sign in to comment.