Skip to content

Commit

Permalink
Merge branch 'master' into java-attacher-config
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartnelson3 authored Jun 23, 2021
2 parents b21a92a + 9453fe2 commit 98949ad
Show file tree
Hide file tree
Showing 33 changed files with 985 additions and 291 deletions.
5 changes: 4 additions & 1 deletion _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ apm-server:
# Sourcemapping is enabled by default.
#enabled: true

# Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration.
# Source maps may be fetched from Elasticsearch by using the
# output.elasticsearch configuration.
# A different instance must be configured when using any other output.
# This setting only affects sourcemap reads - the output determines where sourcemaps are written.
# Note: Configuring elasticsearch is not supported if apm-server is being
# managed by Fleet.
#elasticsearch:
# Array of hosts to connect to.
# Scheme and port can be left out and will be set to the default (`http` and `9200`).
Expand Down
5 changes: 4 additions & 1 deletion apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ apm-server:
# Sourcemapping is enabled by default.
#enabled: true

# Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration.
# Source maps may be fetched from Elasticsearch by using the
# output.elasticsearch configuration.
# A different instance must be configured when using any other output.
# This setting only affects sourcemap reads - the output determines where sourcemaps are written.
# Note: Configuring elasticsearch is not supported if apm-server is being
# managed by Fleet.
#elasticsearch:
# Array of hosts to connect to.
# Scheme and port can be left out and will be set to the default (`http` and `9200`).
Expand Down
5 changes: 4 additions & 1 deletion apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ apm-server:
# Sourcemapping is enabled by default.
#enabled: true

# Source maps are always fetched from Elasticsearch, by default using the output.elasticsearch configuration.
# Source maps may be fetched from Elasticsearch by using the
# output.elasticsearch configuration.
# A different instance must be configured when using any other output.
# This setting only affects sourcemap reads - the output determines where sourcemaps are written.
# Note: Configuring elasticsearch is not supported if apm-server is being
# managed by Fleet.
#elasticsearch:
# Array of hosts to connect to.
# Scheme and port can be left out and will be set to the default (`http` and `9200`).
Expand Down
6 changes: 0 additions & 6 deletions apmpackage/apm/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,6 @@ policy_templates:
required: false
show_user: false
default: 10000
- name: sourcemap_api_key
type: text
title: RUM - API Key for Sourcemaps
required: false
description: API Key for sourcemap feature. Enter as <Id>:<API Key>
show_user: false
- name: api_key_limit
type: integer
title: Maximum number of API Keys for Agent authentication
Expand Down
2 changes: 2 additions & 0 deletions beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func Handler(handler StreamHandler, requestMetadataFunc RequestMetadataFunc, bat
return
}

// copy batchProcessor to avoid updating closure below
batchProcessor := batchProcessor
if limiter, ok := ratelimit.FromContext(c.Request.Context()); ok {
// Apply rate limiting after reading but before processing any events.
batchProcessor = modelprocessor.Chained{
Expand Down
26 changes: 26 additions & 0 deletions beater/api/intake/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,32 @@ func TestRateLimiting(t *testing.T) {
}
}

func TestRateLimitingRequests(t *testing.T) {
// Check that rate limiting across multiple requests is handled correctly.
//
// ratelimit.ndjson contains 19 events, and we rate limit in batches of 10
// events. The burst of 41 should be enough for 2 iterations with one left.
limiter := rate.NewLimiter(1, 41)
processor := stream.BackendProcessor(config.DefaultConfig())
handler := Handler(processor, emptyRequestMetadata, modelprocessor.Nop{})

data, err := ioutil.ReadFile("../../../testdata/intake-v2/ratelimit.ndjson")
require.NoError(t, err)
for i := 0; i < 2; i++ {
r := httptest.NewRequest("POST", "/", bytes.NewBuffer(data))
r = r.WithContext(ratelimit.ContextWithLimiter(r.Context(), limiter))
r.Header.Add("Content-Type", "application/x-ndjson")

w := httptest.NewRecorder()
c := request.NewContext()
c.Reset(w, r)
handler(c)
assert.Equal(t, http.StatusAccepted, w.Code)
}
assert.True(t, limiter.Allow())
assert.False(t, limiter.Allow())
}

type testcaseIntakeHandler struct {
c *request.Context
w *httptest.ResponseRecorder
Expand Down
68 changes: 49 additions & 19 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ package beater
import (
"context"
"net"
"net/http"
"regexp"
"runtime"
"strings"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common/fleetmode"

"github.com/elastic/beats/v7/libbeat/kibana"
"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"

"github.com/pkg/errors"
"go.elastic.co/apm"
"go.elastic.co/apm/module/apmhttp"
"golang.org/x/sync/errgroup"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -256,8 +258,8 @@ func (s *serverCreator) Create(p beat.PipelineConnector, rawConfig *common.Confi
sharedServerRunnerParams: s.args,
Namespace: namespace,
Pipeline: p,
KibanaConfig: &integrationConfig.Fleet.Kibana,
RawConfig: apmServerCommonConfig,
FleetConfig: &integrationConfig.Fleet,
})
}

Expand All @@ -279,6 +281,7 @@ type serverRunner struct {
acker *waitPublishedAcker
namespace string
config *config.Config
fleetConfig *config.Fleet
beat *beat.Beat
logger *logp.Logger
tracer *apm.Tracer
Expand All @@ -289,10 +292,10 @@ type serverRunner struct {
type serverRunnerParams struct {
sharedServerRunnerParams

Namespace string
Pipeline beat.PipelineConnector
KibanaConfig *kibana.ClientConfig
RawConfig *common.Config
Namespace string
Pipeline beat.PipelineConnector
RawConfig *common.Config
FleetConfig *config.Fleet
}

type sharedServerRunnerParams struct {
Expand All @@ -310,17 +313,14 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne
return nil, err
}

if cfg.DataStreams.Enabled && args.KibanaConfig != nil {
cfg.Kibana.ClientConfig = *args.KibanaConfig
}

runServerContext, cancel := context.WithCancel(ctx)
return &serverRunner{
backgroundContext: ctx,
runServerContext: runServerContext,
cancelRunServerContext: cancel,

config: cfg,
fleetConfig: args.FleetConfig,
acker: args.Acker,
pipeline: args.Pipeline,
namespace: args.Namespace,
Expand Down Expand Up @@ -359,8 +359,7 @@ func (s *serverRunner) Start() {
func (s *serverRunner) run() error {
// Send config to telemetry.
recordAPMServerConfig(s.config)

transformConfig, err := newTransformConfig(s.beat.Info, s.config)
transformConfig, err := newTransformConfig(s.beat.Info, s.config, s.fleetConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -602,7 +601,7 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ
}
}

func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Config, error) {
func newTransformConfig(beatInfo beat.Info, cfg *config.Config, fleetCfg *config.Fleet) (*transform.Config, error) {
transformConfig := &transform.Config{
DataStreams: cfg.DataStreams.Enabled,
RUM: transform.RUMConfig{
Expand All @@ -611,8 +610,8 @@ func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Conf
},
}

if cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled && cfg.RumConfig.SourceMapping.ESConfig != nil {
store, err := newSourcemapStore(beatInfo, cfg.RumConfig.SourceMapping)
if cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled {
store, err := newSourcemapStore(beatInfo, cfg.RumConfig.SourceMapping, fleetCfg)
if err != nil {
return nil, err
}
Expand All @@ -622,13 +621,44 @@ func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Conf
return transformConfig, nil
}

func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping) (*sourcemap.Store, error) {
esClient, err := elasticsearch.NewClient(cfg.ESConfig)
func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping, fleetCfg *config.Fleet) (*sourcemap.Store, error) {
if fleetCfg != nil {
var (
c = *http.DefaultClient
rt = http.DefaultTransport
)
var tlsConfig *tlscommon.TLSConfig
var err error
if fleetCfg.TLS.IsEnabled() {
if tlsConfig, err = tlscommon.LoadTLSConfig(fleetCfg.TLS); err != nil {
return nil, err
}
}

// Default for es is 90s :shrug:
timeout := 30 * time.Second
dialer := transport.NetDialer(timeout)
tlsDialer, err := transport.TLSDialer(dialer, tlsConfig, timeout)
if err != nil {
return nil, err
}

rt = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: dialer.Dial,
DialTLS: tlsDialer.Dial,
TLSClientConfig: tlsConfig.ToConfig(),
}

c.Transport = apmhttp.WrapRoundTripper(rt)
return sourcemap.NewFleetStore(&c, fleetCfg, cfg.Metadata, cfg.Cache.Expiration)
}
c, err := elasticsearch.NewClient(cfg.ESConfig)
if err != nil {
return nil, err
}
index := strings.ReplaceAll(cfg.IndexPattern, "%{[observer.version]}", beatInfo.Version)
return sourcemap.NewStore(esClient, index, cfg.Cache.Expiration)
return sourcemap.NewElasticsearchStore(c, index, cfg.Cache.Expiration)
}

// WrapRunServerWithProcessors wraps runServer such that it wraps args.Reporter
Expand Down
68 changes: 66 additions & 2 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package beater

import (
"compress/zlib"
"context"
"errors"
"net"
Expand All @@ -36,7 +37,9 @@ import (
"go.uber.org/zap/zaptest/observer"

"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/sourcemap/test"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/instrumentation"
Expand Down Expand Up @@ -246,7 +249,7 @@ func TestTransformConfigIndex(t *testing.T) {
cfg.RumConfig.SourceMapping.IndexPattern = indexPattern
}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg)
transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
require.NoError(t, err)
require.NotNil(t, transformConfig.RUM.SourcemapStore)
transformConfig.RUM.SourcemapStore.Added(context.Background(), "name", "version", "path")
Expand All @@ -266,7 +269,7 @@ func TestTransformConfig(t *testing.T) {
cfg := config.DefaultConfig()
cfg.RumConfig.Enabled = rumEnabled
cfg.RumConfig.SourceMapping.Enabled = sourcemapEnabled
transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg)
transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
require.NoError(t, err)
if expectSourcemapStore {
assert.NotNil(t, transformConfig.RUM.SourcemapStore)
Expand All @@ -280,3 +283,64 @@ func TestTransformConfig(t *testing.T) {
test(true, false, false)
test(true, true, true)
}

func TestStoreUsesRUMElasticsearchConfig(t *testing.T) {
var called bool
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
w.Write([]byte(test.ValidSourcemap))
}))
defer ts.Close()

cfg := config.DefaultConfig()
cfg.RumConfig.Enabled = true
cfg.RumConfig.SourceMapping.Enabled = true
cfg.RumConfig.SourceMapping.ESConfig = elasticsearch.DefaultConfig()
cfg.RumConfig.SourceMapping.ESConfig.Hosts = []string{ts.URL}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
require.NoError(t, err)
// Check that the provided rum elasticsearch config was used and
// Fetch() goes to the test server.
_, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path")
require.NoError(t, err)

assert.True(t, called)
}

func TestFleetStoreUsed(t *testing.T) {
var called bool
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
wr := zlib.NewWriter(w)
defer wr.Close()
wr.Write([]byte(test.ValidSourcemap))
}))
defer ts.Close()

cfg := config.DefaultConfig()
cfg.RumConfig.Enabled = true
cfg.RumConfig.SourceMapping.Enabled = true
cfg.RumConfig.SourceMapping.Metadata = []config.SourceMapMetadata{{
ServiceName: "app",
ServiceVersion: "1.0",
BundleFilepath: "/bundle/path",
SourceMapURL: "/my/path",
}}

fleetCfg := &config.Fleet{
Hosts: []string{ts.URL[7:]},
Protocol: "http",
AccessAPIKey: "my-key",
TLS: nil,
}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, fleetCfg)
require.NoError(t, err)
// Check that the provided rum elasticsearch config was used and
// Fetch() goes to the test server.
_, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path")
require.NoError(t, err)

assert.True(t, called)
}
17 changes: 17 additions & 0 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func TestUnpackConfig(t *testing.T) {
MaxRetries: 3,
Backoff: elasticsearch.DefaultBackoffConfig,
},
Metadata: []SourceMapMetadata{},
esConfigured: true,
},
LibraryPattern: "^custom",
Expand Down Expand Up @@ -295,6 +296,14 @@ func TestUnpackConfig(t *testing.T) {
"rum": map[string]interface{}{
"enabled": true,
"source_mapping": map[string]interface{}{
"metadata": []map[string]string{
{
"service.name": "opbeans-rum",
"service.version": "1.2.3",
"bundle.filepath": "/test/e2e/general-usecase/bundle.js.map",
"sourcemap.url": "http://somewhere.com/bundle.js.map",
},
},
"cache": map[string]interface{}{
"expiration": 7,
},
Expand Down Expand Up @@ -376,6 +385,14 @@ func TestUnpackConfig(t *testing.T) {
},
IndexPattern: "apm-*-sourcemap*",
ESConfig: elasticsearch.DefaultConfig(),
Metadata: []SourceMapMetadata{
{
ServiceName: "opbeans-rum",
ServiceVersion: "1.2.3",
BundleFilepath: "/test/e2e/general-usecase/bundle.js.map",
SourceMapURL: "http://somewhere.com/bundle.js.map",
},
},
},
LibraryPattern: "rum",
ExcludeFromGrouping: "^/webpack",
Expand Down
Loading

0 comments on commit 98949ad

Please sign in to comment.