Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Service specific source maps (backport #5410) #5524

Merged
merged 3 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ apm-server:
# Sourcemapping is enabled by default.
#enabled: true

# Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration.
# Source maps may be fetched from Elasticsearch by using the
# output.elasticsearch configuration.
# A different instance must be configured when using any other output.
# This setting only affects sourcemap reads - the output determines where sourcemaps are written.
# Note: Configuring elasticsearch is not supported if apm-server is being
# managed by Fleet.
#elasticsearch:
# Array of hosts to connect to.
# Scheme and port can be left out and will be set to the default (`http` and `9200`).
Expand Down
5 changes: 4 additions & 1 deletion apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ apm-server:
# Sourcemapping is enabled by default.
#enabled: true

# Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration.
# Source maps may be fetched from Elasticsearch by using the
# output.elasticsearch configuration.
# A different instance must be configured when using any other output.
# This setting only affects sourcemap reads - the output determines where sourcemaps are written.
# Note: Configuring elasticsearch is not supported if apm-server is being
# managed by Fleet.
#elasticsearch:
# Array of hosts to connect to.
# Scheme and port can be left out and will be set to the default (`http` and `9200`).
Expand Down
5 changes: 4 additions & 1 deletion apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ apm-server:
# Sourcemapping is enabled by default.
#enabled: true

# Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration.
# Source maps may be fetched from Elasticsearch by using the
# output.elasticsearch configuration.
# A different instance must be configured when using any other output.
# This setting only affects sourcemap reads - the output determines where sourcemaps are written.
# Note: Configuring elasticsearch is not supported if apm-server is being
# managed by Fleet.
#elasticsearch:
# Array of hosts to connect to.
# Scheme and port can be left out and will be set to the default (`http` and `9200`).
Expand Down
8 changes: 1 addition & 7 deletions apmpackage/apm/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,7 @@ policy_templates:
description: Number of unique IPs to be cached for the rate limiter.
required: false
show_user: false
default: 1000
- name: sourcemap_api_key
type: text
title: RUM - API Key for Sourcemaps
required: false
description: API Key for sourcemap feature. Enter as <Id>:<API Key>
show_user: false
default: 10000
- name: api_key_limit
type: integer
title: Maximum number of API Keys for Agent authentication
Expand Down
68 changes: 49 additions & 19 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ package beater
import (
"context"
"net"
"net/http"
"regexp"
"runtime"
"strings"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common/fleetmode"

"github.com/elastic/beats/v7/libbeat/kibana"
"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"

"github.com/pkg/errors"
"go.elastic.co/apm"
"go.elastic.co/apm/module/apmhttp"
"golang.org/x/sync/errgroup"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -256,8 +258,8 @@ func (s *serverCreator) Create(p beat.PipelineConnector, rawConfig *common.Confi
sharedServerRunnerParams: s.args,
Namespace: namespace,
Pipeline: p,
KibanaConfig: &integrationConfig.Fleet.Kibana,
RawConfig: apmServerCommonConfig,
FleetConfig: &integrationConfig.Fleet,
})
}

Expand All @@ -279,6 +281,7 @@ type serverRunner struct {
acker *waitPublishedAcker
namespace string
config *config.Config
fleetConfig *config.Fleet
beat *beat.Beat
logger *logp.Logger
tracer *apm.Tracer
Expand All @@ -289,10 +292,10 @@ type serverRunner struct {
type serverRunnerParams struct {
sharedServerRunnerParams

Namespace string
Pipeline beat.PipelineConnector
KibanaConfig *kibana.ClientConfig
RawConfig *common.Config
Namespace string
Pipeline beat.PipelineConnector
RawConfig *common.Config
FleetConfig *config.Fleet
}

type sharedServerRunnerParams struct {
Expand All @@ -310,17 +313,14 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne
return nil, err
}

if cfg.DataStreams.Enabled && args.KibanaConfig != nil {
cfg.Kibana.ClientConfig = *args.KibanaConfig
}

runServerContext, cancel := context.WithCancel(ctx)
return &serverRunner{
backgroundContext: ctx,
runServerContext: runServerContext,
cancelRunServerContext: cancel,

config: cfg,
fleetConfig: args.FleetConfig,
acker: args.Acker,
pipeline: args.Pipeline,
namespace: args.Namespace,
Expand Down Expand Up @@ -359,8 +359,7 @@ func (s *serverRunner) Start() {
func (s *serverRunner) run() error {
// Send config to telemetry.
recordAPMServerConfig(s.config)

transformConfig, err := newTransformConfig(s.beat.Info, s.config)
transformConfig, err := newTransformConfig(s.beat.Info, s.config, s.fleetConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -602,7 +601,7 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ
}
}

func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Config, error) {
func newTransformConfig(beatInfo beat.Info, cfg *config.Config, fleetCfg *config.Fleet) (*transform.Config, error) {
transformConfig := &transform.Config{
DataStreams: cfg.DataStreams.Enabled,
RUM: transform.RUMConfig{
Expand All @@ -611,8 +610,8 @@ func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Conf
},
}

if cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled && cfg.RumConfig.SourceMapping.ESConfig != nil {
store, err := newSourcemapStore(beatInfo, cfg.RumConfig.SourceMapping)
if cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled {
store, err := newSourcemapStore(beatInfo, cfg.RumConfig.SourceMapping, fleetCfg)
if err != nil {
return nil, err
}
Expand All @@ -622,13 +621,44 @@ func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Conf
return transformConfig, nil
}

func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping) (*sourcemap.Store, error) {
esClient, err := elasticsearch.NewClient(cfg.ESConfig)
func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping, fleetCfg *config.Fleet) (*sourcemap.Store, error) {
if fleetCfg != nil {
var (
c = *http.DefaultClient
rt = http.DefaultTransport
)
var tlsConfig *tlscommon.TLSConfig
var err error
if fleetCfg.TLS.IsEnabled() {
if tlsConfig, err = tlscommon.LoadTLSConfig(fleetCfg.TLS); err != nil {
return nil, err
}
}

// Default for es is 90s :shrug:
timeout := 30 * time.Second
dialer := transport.NetDialer(timeout)
tlsDialer, err := transport.TLSDialer(dialer, tlsConfig, timeout)
if err != nil {
return nil, err
}

rt = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: dialer.Dial,
DialTLS: tlsDialer.Dial,
TLSClientConfig: tlsConfig.ToConfig(),
}

c.Transport = apmhttp.WrapRoundTripper(rt)
return sourcemap.NewFleetStore(&c, fleetCfg, cfg.Metadata, cfg.Cache.Expiration)
}
c, err := elasticsearch.NewClient(cfg.ESConfig)
if err != nil {
return nil, err
}
index := strings.ReplaceAll(cfg.IndexPattern, "%{[observer.version]}", beatInfo.Version)
return sourcemap.NewStore(esClient, index, cfg.Cache.Expiration)
return sourcemap.NewElasticsearchStore(c, index, cfg.Cache.Expiration)
}

// WrapRunServerWithProcessors wraps runServer such that it wraps args.Reporter
Expand Down
68 changes: 66 additions & 2 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package beater

import (
"compress/zlib"
"context"
"errors"
"net"
Expand All @@ -36,7 +37,9 @@ import (
"go.uber.org/zap/zaptest/observer"

"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/sourcemap/test"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/instrumentation"
Expand Down Expand Up @@ -246,7 +249,7 @@ func TestTransformConfigIndex(t *testing.T) {
cfg.RumConfig.SourceMapping.IndexPattern = indexPattern
}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg)
transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
require.NoError(t, err)
require.NotNil(t, transformConfig.RUM.SourcemapStore)
transformConfig.RUM.SourcemapStore.Added(context.Background(), "name", "version", "path")
Expand All @@ -266,7 +269,7 @@ func TestTransformConfig(t *testing.T) {
cfg := config.DefaultConfig()
cfg.RumConfig.Enabled = rumEnabled
cfg.RumConfig.SourceMapping.Enabled = sourcemapEnabled
transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg)
transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
require.NoError(t, err)
if expectSourcemapStore {
assert.NotNil(t, transformConfig.RUM.SourcemapStore)
Expand All @@ -280,3 +283,64 @@ func TestTransformConfig(t *testing.T) {
test(true, false, false)
test(true, true, true)
}

func TestStoreUsesRUMElasticsearchConfig(t *testing.T) {
var called bool
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
w.Write([]byte(test.ValidSourcemap))
}))
defer ts.Close()

cfg := config.DefaultConfig()
cfg.RumConfig.Enabled = true
cfg.RumConfig.SourceMapping.Enabled = true
cfg.RumConfig.SourceMapping.ESConfig = elasticsearch.DefaultConfig()
cfg.RumConfig.SourceMapping.ESConfig.Hosts = []string{ts.URL}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
require.NoError(t, err)
// Check that the provided rum elasticsearch config was used and
// Fetch() goes to the test server.
_, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path")
require.NoError(t, err)

assert.True(t, called)
}

func TestFleetStoreUsed(t *testing.T) {
var called bool
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
wr := zlib.NewWriter(w)
defer wr.Close()
wr.Write([]byte(test.ValidSourcemap))
}))
defer ts.Close()

cfg := config.DefaultConfig()
cfg.RumConfig.Enabled = true
cfg.RumConfig.SourceMapping.Enabled = true
cfg.RumConfig.SourceMapping.Metadata = []config.SourceMapMetadata{{
ServiceName: "app",
ServiceVersion: "1.0",
BundleFilepath: "/bundle/path",
SourceMapURL: "/my/path",
}}

fleetCfg := &config.Fleet{
Hosts: []string{ts.URL[7:]},
Protocol: "http",
AccessAPIKey: "my-key",
TLS: nil,
}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, fleetCfg)
require.NoError(t, err)
// Check that the provided rum elasticsearch config was used and
// Fetch() goes to the test server.
_, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path")
require.NoError(t, err)

assert.True(t, called)
}
17 changes: 17 additions & 0 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func TestUnpackConfig(t *testing.T) {
MaxRetries: 3,
Backoff: elasticsearch.DefaultBackoffConfig,
},
Metadata: []SourceMapMetadata{},
esConfigured: true,
},
LibraryPattern: "^custom",
Expand Down Expand Up @@ -295,6 +296,14 @@ func TestUnpackConfig(t *testing.T) {
"rum": map[string]interface{}{
"enabled": true,
"source_mapping": map[string]interface{}{
"metadata": []map[string]string{
{
"service.name": "opbeans-rum",
"service.version": "1.2.3",
"bundle.filepath": "/test/e2e/general-usecase/bundle.js.map",
"sourcemap.url": "http://somewhere.com/bundle.js.map",
},
},
"cache": map[string]interface{}{
"expiration": 7,
},
Expand Down Expand Up @@ -376,6 +385,14 @@ func TestUnpackConfig(t *testing.T) {
},
IndexPattern: "apm-*-sourcemap*",
ESConfig: elasticsearch.DefaultConfig(),
Metadata: []SourceMapMetadata{
{
ServiceName: "opbeans-rum",
ServiceVersion: "1.2.3",
BundleFilepath: "/test/e2e/general-usecase/bundle.js.map",
SourceMapURL: "http://somewhere.com/bundle.js.map",
},
},
},
LibraryPattern: "rum",
ExcludeFromGrouping: "^/webpack",
Expand Down
7 changes: 5 additions & 2 deletions beater/config/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/kibana"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)

func NewIntegrationConfig(rootConfig *common.Config) (*IntegrationConfig, error) {
Expand Down Expand Up @@ -66,5 +66,8 @@ type Package struct {
}

type Fleet struct {
Kibana kibana.ClientConfig `config:"kibana"`
Hosts []string `config:"hosts"`
Protocol string `config:"protocol"`
AccessAPIKey string `config:"access_api_key"`
TLS *tlscommon.Config `config:"ssl"`
}
Loading