Skip to content

Commit

Permalink
Update tail sampling to require data streams (#5952)
Browse files Browse the repository at this point in the history
* Update tail sampling to require data streams

Tail-sampling now requires data streams to be enabled,
and apm-server will error if tail-sampling is enabled
and data streams are not.

The code for creating a sampled traces data stream when
running in standalone has been removed. The data stream
is now expected to be created by installing the integration
package.

Rename the sampled traces data stream from `traces-sampled-*`
to `traces-apm.sampled-*`, to maintain a common "apm" prefix
for all datasets.

The Elasticsearch role used by apm-server in tests has been
updated with access to `traces-apm*`, `metrics-apm*`, and
`logs-apm*`.

* systemtest: fix TestTailSamplingUnlicensed

* Update changelog
  • Loading branch information
axw authored Aug 18, 2021
1 parent 9ea5e3b commit 2c65c70
Show file tree
Hide file tree
Showing 14 changed files with 81 additions and 153 deletions.
2 changes: 1 addition & 1 deletion apmpackage/apm/data_stream/sampled_traces/manifest.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
title: APM tail-sampled traces
type: traces
dataset: sampled
dataset: apm.sampled
ilm_policy: traces-apm.sampled-default_policy
elasticsearch:
index_template:
Expand Down
2 changes: 1 addition & 1 deletion beater/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error)
return nil, err
}

if err := c.Sampling.Tail.setup(logger, outputESCfg); err != nil {
if err := c.Sampling.Tail.setup(logger, c.DataStreams.Enabled, outputESCfg); err != nil {
return nil, err
}

Expand Down
11 changes: 8 additions & 3 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func TestUnpackConfig(t *testing.T) {
"aggregation.service_destinations.enabled": false,
"sampling.keep_unsampled": false,
"sampling.tail": map[string]interface{}{
"enabled": true,
"enabled": false,
"policies": []map[string]interface{}{{"sample_rate": 0.5}},
"interval": "2m",
"ingest_rate_decay": 1.0,
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestUnpackConfig(t *testing.T) {
Sampling: SamplingConfig{
KeepUnsampled: false,
Tail: TailSamplingConfig{
Enabled: true,
Enabled: false,
Policies: []TailSamplingPolicy{{SampleRate: 0.5}},
ESConfig: elasticsearch.DefaultConfig(),
Interval: 2 * time.Minute,
Expand Down Expand Up @@ -653,7 +653,12 @@ func TestAgentConfigs(t *testing.T) {
}

func TestNewConfig_ESConfig(t *testing.T) {
ucfg, err := common.NewConfigFrom(`{"rum.enabled":true,"api_key.enabled":true,"sampling.tail.policies":[{"sample_rate": 0.5}]}`)
ucfg, err := common.NewConfigFrom(`{
"rum.enabled":true,
"api_key.enabled":true,
"data_streams.enabled":true,
"sampling.tail.policies":[{"sample_rate": 0.5}],
}`)
require.NoError(t, err)

// no es config given
Expand Down
5 changes: 4 additions & 1 deletion beater/config/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,13 @@ func (c *TailSamplingConfig) Validate() error {
return nil
}

func (c *TailSamplingConfig) setup(log *logp.Logger, outputESCfg *common.Config) error {
func (c *TailSamplingConfig) setup(log *logp.Logger, dataStreamsEnabled bool, outputESCfg *common.Config) error {
if !c.Enabled {
return nil
}
if !dataStreamsEnabled {
return errors.New("tail-sampling requires data streams to be enabled")
}
if !c.esConfigured && outputESCfg != nil {
log.Info("Falling back to elasticsearch output for tail-sampling")
if err := outputESCfg.Unpack(&c.ESConfig); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions beater/config/sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
func TestSamplingPoliciesValidation(t *testing.T) {
t.Run("MinimallyValid", func(t *testing.T) {
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
"data_streams.enabled": true,
"sampling.tail.policies": []map[string]interface{}{{
"sample_rate": 0.5,
}},
Expand All @@ -36,17 +37,28 @@ func TestSamplingPoliciesValidation(t *testing.T) {
})
t.Run("NoPolicies", func(t *testing.T) {
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
"data_streams.enabled": true,
"sampling.tail.enabled": true,
}), nil)
assert.EqualError(t, err, "Error processing configuration: invalid tail sampling config: no policies specified accessing 'sampling.tail'")
})
t.Run("NoDefaultPolicies", func(t *testing.T) {
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
"data_streams.enabled": true,
"sampling.tail.policies": []map[string]interface{}{{
"service.name": "foo",
"sample_rate": 0.5,
}},
}), nil)
assert.EqualError(t, err, "Error processing configuration: invalid tail sampling config: no default (empty criteria) policy specified accessing 'sampling.tail'")
})
t.Run("DataStreamsDisabled", func(t *testing.T) {
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
"sampling.tail.enabled": true,
"sampling.tail.policies": []map[string]interface{}{{
"sample_rate": 0.5,
}},
}), nil)
assert.EqualError(t, err, "tail-sampling requires data streams to be enabled")
})
}
2 changes: 1 addition & 1 deletion beater/waitintegration.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func checkIntegrationInstalledElasticsearch(ctx context.Context, esClient elasti
// TODO(axw) generate the list of expected index templates.
templates := []string{
"traces-apm",
"traces-sampled",
"traces-apm.sampled",
"metrics-apm.app",
"metrics-apm.internal",
"logs-apm.error",
Expand Down
2 changes: 2 additions & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits]
==== Breaking Changes
- `network.connection_type` is now `network.connection.type` {pull}5671[5671]
- `transaction.page` and `error.page` no longer recorded {pull}5872[5872]
- experimental:["This breaking change applies to the experimental tail-based sampling feature."] `apm-server.sampling.tail` now requires `apm-server.data_streams.enabled` {pull}5952[5952]
- beta:["This breaking change applies to the beta <<apm-integration>>."] The `traces-sampled-*` data stream is now `traces-apm.sampled-*` {pull}5952[5952]

[float]
==== Bug fixes
Expand Down
3 changes: 2 additions & 1 deletion systemtest/apmservertest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ type RUMSourcemapCacheConfig struct {

// DataStreamsConfig holds APM Server data streams configuration.
type DataStreamsConfig struct {
Enabled bool `json:"enabled"`
Enabled bool `json:"enabled"`
WaitForIntegration *bool `json:"wait_for_integration,omitempty"`
}

// APIKeyConfig holds agent auth configuration.
Expand Down
11 changes: 11 additions & 0 deletions systemtest/fleettest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,17 @@ func (c *Client) Package(name, version string) (*Package, error) {
return &result.Response, nil
}

// InstallPackage installs the package with the given name.
func (c *Client) InstallPackage(name, version string) error {
req := c.newFleetRequest("POST", "/epm/packages/"+name+"-"+version, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return consumeResponse(resp, nil)
}

// DeletePackage deletes (uninstalls) the package with the given name and version.
func (c *Client) DeletePackage(name, version string) error {
req := c.newFleetRequest("DELETE", "/epm/packages/"+name+"-"+version, nil)
Expand Down
30 changes: 27 additions & 3 deletions systemtest/sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,13 @@ func TestKeepUnsampledWarning(t *testing.T) {

func TestTailSampling(t *testing.T) {
systemtest.CleanupElasticsearch(t)
cleanupFleet(t, systemtest.Fleet)
integrationPackage := getAPMIntegrationPackage(t, systemtest.Fleet)
err := systemtest.Fleet.InstallPackage(integrationPackage.Name, integrationPackage.Version)
require.NoError(t, err)

srv1 := apmservertest.NewUnstartedServer(t)
srv1.Config.DataStreams = &apmservertest.DataStreamsConfig{Enabled: true}
srv1.Config.Sampling = &apmservertest.SamplingConfig{
Tail: &apmservertest.TailSamplingConfig{
Enabled: true,
Expand All @@ -105,6 +110,7 @@ func TestTailSampling(t *testing.T) {
require.NoError(t, srv1.Start())

srv2 := apmservertest.NewUnstartedServer(t)
srv2.Config.DataStreams = &apmservertest.DataStreamsConfig{Enabled: true}
srv2.Config.Sampling = srv1.Config.Sampling
require.NoError(t, srv2.Start())

Expand All @@ -128,12 +134,12 @@ func TestTailSampling(t *testing.T) {

// Flush the data stream while the test is running, as we have no
// control over the settings for the sampled traces index template.
refreshPeriodically(t, 250*time.Millisecond, "apm-sampled-traces")
refreshPeriodically(t, 250*time.Millisecond, "traces-apm.sampled-*")

for _, transactionType := range []string{"parent", "child"} {
var result estest.SearchResult
t.Logf("waiting for %d %q transactions", expected, transactionType)
_, err := systemtest.Elasticsearch.Search("apm-*").WithQuery(estest.TermQuery{
_, err := systemtest.Elasticsearch.Search("traces-*").WithQuery(estest.TermQuery{
Field: "transaction.type",
Value: transactionType,
}).WithSize(total).Do(context.Background(), &result,
Expand Down Expand Up @@ -172,8 +178,17 @@ func TestTailSamplingUnlicensed(t *testing.T) {
require.NoError(t, es.Start())
defer es.Close()

// Data streams are required for tail-based sampling, but since we're using
// an ephemeral Elasticsearch container it's not straightforward to install
// the integration package. We won't be indexing anything, so just don't wait
// for the integration package to be installed in this test.
waitForIntegration := false
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Output.Elasticsearch.Hosts = []string{es.Addr}
srv.Config.DataStreams = &apmservertest.DataStreamsConfig{
Enabled: true,
WaitForIntegration: &waitForIntegration,
}
srv.Config.Sampling = &apmservertest.SamplingConfig{
Tail: &apmservertest.TailSamplingConfig{
Enabled: true,
Expand All @@ -183,6 +198,15 @@ func TestTailSamplingUnlicensed(t *testing.T) {
}
require.NoError(t, srv.Start())

// Send some transactions to trigger an indexing attempt.
tracer := srv.Tracer()
for i := 0; i < 100; i++ {
tx := tracer.StartTransaction("GET /", "parent")
tx.Duration = time.Second * time.Duration(i+1)
tx.End()
}
tracer.Flush(nil)

timeout := time.After(time.Minute)
logs := srv.Logs.Iterator()
for {
Expand All @@ -198,7 +222,7 @@ func TestTailSamplingUnlicensed(t *testing.T) {

// Due to the failing license check, APM Server will refuse to index anything.
var result estest.SearchResult
_, err = es.Client.Search("apm-*").Do(context.Background(), &result)
_, err = es.Client.Search("traces-apm*").Do(context.Background(), &result)
assert.NoError(t, err)
assert.Empty(t, result.Hits.Hits)
}
Expand Down
2 changes: 1 addition & 1 deletion testing/docker/elasticsearch/roles.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apm_server:
cluster: ['manage_ilm','manage_security','manage_api_key']
indices:
- names: ['apm-*']
- names: ['apm-*', 'traces-apm*', 'logs-apm*', 'metrics-apm*']
privileges: ['write','create_index','manage','manage_ilm']
applications:
- application: 'apm'
Expand Down
37 changes: 10 additions & 27 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/elastic/apm-server/x-pack/apm-server/aggregation/txmetrics"
"github.com/elastic/apm-server/x-pack/apm-server/cmd"
"github.com/elastic/apm-server/x-pack/apm-server/sampling"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub"
)

var (
Expand Down Expand Up @@ -118,6 +117,10 @@ func licensePlatinumCovered(client *eslegclient.Connection) error {
}

func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, error) {
if !args.Config.DataStreams.Enabled {
return nil, errors.New("tail-based sampling requires data streams")
}

// Tail-based sampling is a Platinum-licensed feature.
//
// FIXME(axw) each time libes.RegisterGlobalCallback is called an additional global
Expand All @@ -132,30 +135,6 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
return nil, errors.Wrap(err, "failed to create Elasticsearch client for tail-sampling")
}

var sampledTracesDataStream sampling.DataStreamConfig
if args.Managed {
// Data stream and ILM policy are managed by Fleet.
sampledTracesDataStream = sampling.DataStreamConfig{
Type: "traces",
Dataset: "sampled",
Namespace: args.Namespace,
}
} else {
sampledTracesDataStream = sampling.DataStreamConfig{
Type: "apm",
Dataset: "sampled",
Namespace: "traces",
}
if err := pubsub.SetupDataStream(context.Background(), es,
"apm-sampled-traces", // Index template
"apm-sampled-traces", // ILM policy
"apm-sampled-traces", // Index pattern
); err != nil {
return nil, errors.Wrap(err, "failed to create data stream for tail-sampling")
}
args.Logger.Infof("Created tail-sampling data stream index template")
}

policies := make([]sampling.Policy, len(tailSamplingConfig.Policies))
for i, in := range tailSamplingConfig.Policies {
policies[i] = sampling.Policy{
Expand All @@ -178,8 +157,12 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
IngestRateDecayFactor: tailSamplingConfig.IngestRateDecayFactor,
},
RemoteSamplingConfig: sampling.RemoteSamplingConfig{
Elasticsearch: es,
SampledTracesDataStream: sampledTracesDataStream,
Elasticsearch: es,
SampledTracesDataStream: sampling.DataStreamConfig{
Type: "traces",
Dataset: "apm.sampled",
Namespace: args.Namespace,
},
},
StorageConfig: sampling.StorageConfig{
StorageDir: paths.Resolve(paths.Data, tailSamplingConfig.StorageDir),
Expand Down
1 change: 1 addition & 0 deletions x-pack/apm-server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestMonitoring(t *testing.T) {
require.NoError(t, err)

cfg := config.DefaultConfig()
cfg.DataStreams.Enabled = true
cfg.Aggregation.Transactions.Enabled = true
cfg.Sampling.Tail.Enabled = true
cfg.Sampling.Tail.Policies = []config.TailSamplingPolicy{{SampleRate: 0.1}}
Expand Down
Loading

0 comments on commit 2c65c70

Please sign in to comment.