Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update tail sampling to require data streams #5952

Merged
merged 3 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apmpackage/apm/data_stream/sampled_traces/manifest.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
title: APM tail-sampled traces
type: traces
dataset: sampled
dataset: apm.sampled
ilm_policy: traces-apm.sampled-default_policy
elasticsearch:
index_template:
Expand Down
2 changes: 1 addition & 1 deletion beater/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error)
return nil, err
}

if err := c.Sampling.Tail.setup(logger, outputESCfg); err != nil {
if err := c.Sampling.Tail.setup(logger, c.DataStreams.Enabled, outputESCfg); err != nil {
return nil, err
}

Expand Down
11 changes: 8 additions & 3 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func TestUnpackConfig(t *testing.T) {
"aggregation.service_destinations.enabled": false,
"sampling.keep_unsampled": false,
"sampling.tail": map[string]interface{}{
"enabled": true,
"enabled": false,
"policies": []map[string]interface{}{{"sample_rate": 0.5}},
"interval": "2m",
"ingest_rate_decay": 1.0,
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestUnpackConfig(t *testing.T) {
Sampling: SamplingConfig{
KeepUnsampled: false,
Tail: TailSamplingConfig{
Enabled: true,
Enabled: false,
Policies: []TailSamplingPolicy{{SampleRate: 0.5}},
ESConfig: elasticsearch.DefaultConfig(),
Interval: 2 * time.Minute,
Expand Down Expand Up @@ -653,7 +653,12 @@ func TestAgentConfigs(t *testing.T) {
}

func TestNewConfig_ESConfig(t *testing.T) {
ucfg, err := common.NewConfigFrom(`{"rum.enabled":true,"api_key.enabled":true,"sampling.tail.policies":[{"sample_rate": 0.5}]}`)
ucfg, err := common.NewConfigFrom(`{
"rum.enabled":true,
"api_key.enabled":true,
"data_streams.enabled":true,
"sampling.tail.policies":[{"sample_rate": 0.5}],
}`)
require.NoError(t, err)

// no es config given
Expand Down
5 changes: 4 additions & 1 deletion beater/config/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,13 @@ func (c *TailSamplingConfig) Validate() error {
return nil
}

func (c *TailSamplingConfig) setup(log *logp.Logger, outputESCfg *common.Config) error {
func (c *TailSamplingConfig) setup(log *logp.Logger, dataStreamsEnabled bool, outputESCfg *common.Config) error {
if !c.Enabled {
return nil
}
if !dataStreamsEnabled {
return errors.New("tail-sampling requires data streams to be enabled")
}
if !c.esConfigured && outputESCfg != nil {
log.Info("Falling back to elasticsearch output for tail-sampling")
if err := outputESCfg.Unpack(&c.ESConfig); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions beater/config/sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
func TestSamplingPoliciesValidation(t *testing.T) {
t.Run("MinimallyValid", func(t *testing.T) {
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
"data_streams.enabled": true,
"sampling.tail.policies": []map[string]interface{}{{
"sample_rate": 0.5,
}},
Expand All @@ -36,17 +37,28 @@ func TestSamplingPoliciesValidation(t *testing.T) {
})
t.Run("NoPolicies", func(t *testing.T) {
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
"data_streams.enabled": true,
"sampling.tail.enabled": true,
}), nil)
assert.EqualError(t, err, "Error processing configuration: invalid tail sampling config: no policies specified accessing 'sampling.tail'")
})
t.Run("NoDefaultPolicies", func(t *testing.T) {
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
"data_streams.enabled": true,
"sampling.tail.policies": []map[string]interface{}{{
"service.name": "foo",
"sample_rate": 0.5,
}},
}), nil)
assert.EqualError(t, err, "Error processing configuration: invalid tail sampling config: no default (empty criteria) policy specified accessing 'sampling.tail'")
})
t.Run("DataStreamsDisabled", func(t *testing.T) {
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
"sampling.tail.enabled": true,
"sampling.tail.policies": []map[string]interface{}{{
"sample_rate": 0.5,
}},
}), nil)
assert.EqualError(t, err, "tail-sampling requires data streams to be enabled")
})
}
2 changes: 1 addition & 1 deletion beater/waitintegration.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func checkIntegrationInstalledElasticsearch(ctx context.Context, esClient elasti
// TODO(axw) generate the list of expected index templates.
templates := []string{
"traces-apm",
"traces-sampled",
"traces-apm.sampled",
"metrics-apm.app",
"metrics-apm.internal",
"logs-apm.error",
Expand Down
2 changes: 2 additions & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits]
==== Breaking Changes
- `network.connection_type` is now `network.connection.type` {pull}5671[5671]
- `transaction.page` and `error.page` no longer recorded {pull}5872[5872]
- experimental:["This breaking change applies to the experimental tail-based sampling feature."] `apm-server.sampling.tail` now requires `apm-server.data_streams.enabled` {pull}5952[5952]
- beta:["This breaking change applies to the beta <<apm-integration>>."] The `traces-sampled-*` data stream is now `traces-apm.sampled-*` {pull}5952[5952]

[float]
==== Bug fixes
Expand Down
3 changes: 2 additions & 1 deletion systemtest/apmservertest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ type RUMSourcemapCacheConfig struct {

// DataStreamsConfig holds APM Server data streams configuration.
type DataStreamsConfig struct {
Enabled bool `json:"enabled"`
Enabled bool `json:"enabled"`
WaitForIntegration *bool `json:"wait_for_integration,omitempty"`
}

// APIKeyConfig holds agent auth configuration.
Expand Down
11 changes: 11 additions & 0 deletions systemtest/fleettest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,17 @@ func (c *Client) Package(name, version string) (*Package, error) {
return &result.Response, nil
}

// InstallPackage installs the package with the given name.
func (c *Client) InstallPackage(name, version string) error {
req := c.newFleetRequest("POST", "/epm/packages/"+name+"-"+version, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return consumeResponse(resp, nil)
}

// DeletePackage deletes (uninstalls) the package with the given name and version.
func (c *Client) DeletePackage(name, version string) error {
req := c.newFleetRequest("DELETE", "/epm/packages/"+name+"-"+version, nil)
Expand Down
30 changes: 27 additions & 3 deletions systemtest/sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,13 @@ func TestKeepUnsampledWarning(t *testing.T) {

func TestTailSampling(t *testing.T) {
systemtest.CleanupElasticsearch(t)
cleanupFleet(t, systemtest.Fleet)
integrationPackage := getAPMIntegrationPackage(t, systemtest.Fleet)
err := systemtest.Fleet.InstallPackage(integrationPackage.Name, integrationPackage.Version)
require.NoError(t, err)

srv1 := apmservertest.NewUnstartedServer(t)
srv1.Config.DataStreams = &apmservertest.DataStreamsConfig{Enabled: true}
srv1.Config.Sampling = &apmservertest.SamplingConfig{
Tail: &apmservertest.TailSamplingConfig{
Enabled: true,
Expand All @@ -105,6 +110,7 @@ func TestTailSampling(t *testing.T) {
require.NoError(t, srv1.Start())

srv2 := apmservertest.NewUnstartedServer(t)
srv2.Config.DataStreams = &apmservertest.DataStreamsConfig{Enabled: true}
srv2.Config.Sampling = srv1.Config.Sampling
require.NoError(t, srv2.Start())

Expand All @@ -128,12 +134,12 @@ func TestTailSampling(t *testing.T) {

// Flush the data stream while the test is running, as we have no
// control over the settings for the sampled traces index template.
refreshPeriodically(t, 250*time.Millisecond, "apm-sampled-traces")
refreshPeriodically(t, 250*time.Millisecond, "traces-apm.sampled-*")

for _, transactionType := range []string{"parent", "child"} {
var result estest.SearchResult
t.Logf("waiting for %d %q transactions", expected, transactionType)
_, err := systemtest.Elasticsearch.Search("apm-*").WithQuery(estest.TermQuery{
_, err := systemtest.Elasticsearch.Search("traces-*").WithQuery(estest.TermQuery{
Field: "transaction.type",
Value: transactionType,
}).WithSize(total).Do(context.Background(), &result,
Expand Down Expand Up @@ -172,8 +178,17 @@ func TestTailSamplingUnlicensed(t *testing.T) {
require.NoError(t, es.Start())
defer es.Close()

// Data streams are required for tail-based sampling, but since we're using
// an ephemeral Elasticsearch container it's not straightforward to install
// the integration package. We won't be indexing anything, so just don't wait
// for the integration package to be installed in this test.
waitForIntegration := false
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Output.Elasticsearch.Hosts = []string{es.Addr}
srv.Config.DataStreams = &apmservertest.DataStreamsConfig{
Enabled: true,
WaitForIntegration: &waitForIntegration,
}
srv.Config.Sampling = &apmservertest.SamplingConfig{
Tail: &apmservertest.TailSamplingConfig{
Enabled: true,
Expand All @@ -183,6 +198,15 @@ func TestTailSamplingUnlicensed(t *testing.T) {
}
require.NoError(t, srv.Start())

// Send some transactions to trigger an indexing attempt.
tracer := srv.Tracer()
for i := 0; i < 100; i++ {
tx := tracer.StartTransaction("GET /", "parent")
tx.Duration = time.Second * time.Duration(i+1)
tx.End()
}
tracer.Flush(nil)

timeout := time.After(time.Minute)
logs := srv.Logs.Iterator()
for {
Expand All @@ -198,7 +222,7 @@ func TestTailSamplingUnlicensed(t *testing.T) {

// Due to the failing license check, APM Server will refuse to index anything.
var result estest.SearchResult
_, err = es.Client.Search("apm-*").Do(context.Background(), &result)
_, err = es.Client.Search("traces-apm*").Do(context.Background(), &result)
assert.NoError(t, err)
assert.Empty(t, result.Hits.Hits)
}
Expand Down
2 changes: 1 addition & 1 deletion testing/docker/elasticsearch/roles.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apm_server:
cluster: ['manage_ilm','manage_security','manage_api_key']
indices:
- names: ['apm-*']
- names: ['apm-*', 'traces-apm*', 'logs-apm*', 'metrics-apm*']
privileges: ['write','create_index','manage','manage_ilm']
applications:
- application: 'apm'
Expand Down
37 changes: 10 additions & 27 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/elastic/apm-server/x-pack/apm-server/aggregation/txmetrics"
"github.com/elastic/apm-server/x-pack/apm-server/cmd"
"github.com/elastic/apm-server/x-pack/apm-server/sampling"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub"
)

var (
Expand Down Expand Up @@ -118,6 +117,10 @@ func licensePlatinumCovered(client *eslegclient.Connection) error {
}

func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, error) {
if !args.Config.DataStreams.Enabled {
return nil, errors.New("tail-based sampling requires data streams")
}

// Tail-based sampling is a Platinum-licensed feature.
//
// FIXME(axw) each time libes.RegisterGlobalCallback is called an additional global
Expand All @@ -132,30 +135,6 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
return nil, errors.Wrap(err, "failed to create Elasticsearch client for tail-sampling")
}

var sampledTracesDataStream sampling.DataStreamConfig
if args.Managed {
// Data stream and ILM policy are managed by Fleet.
sampledTracesDataStream = sampling.DataStreamConfig{
Type: "traces",
Dataset: "sampled",
Namespace: args.Namespace,
}
} else {
sampledTracesDataStream = sampling.DataStreamConfig{
Type: "apm",
Dataset: "sampled",
Namespace: "traces",
}
if err := pubsub.SetupDataStream(context.Background(), es,
"apm-sampled-traces", // Index template
"apm-sampled-traces", // ILM policy
"apm-sampled-traces", // Index pattern
); err != nil {
return nil, errors.Wrap(err, "failed to create data stream for tail-sampling")
}
args.Logger.Infof("Created tail-sampling data stream index template")
}

policies := make([]sampling.Policy, len(tailSamplingConfig.Policies))
for i, in := range tailSamplingConfig.Policies {
policies[i] = sampling.Policy{
Expand All @@ -178,8 +157,12 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
IngestRateDecayFactor: tailSamplingConfig.IngestRateDecayFactor,
},
RemoteSamplingConfig: sampling.RemoteSamplingConfig{
Elasticsearch: es,
SampledTracesDataStream: sampledTracesDataStream,
Elasticsearch: es,
SampledTracesDataStream: sampling.DataStreamConfig{
Type: "traces",
Dataset: "apm.sampled",
Namespace: args.Namespace,
},
},
StorageConfig: sampling.StorageConfig{
StorageDir: paths.Resolve(paths.Data, tailSamplingConfig.StorageDir),
Expand Down
1 change: 1 addition & 0 deletions x-pack/apm-server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestMonitoring(t *testing.T) {
require.NoError(t, err)

cfg := config.DefaultConfig()
cfg.DataStreams.Enabled = true
cfg.Aggregation.Transactions.Enabled = true
cfg.Sampling.Tail.Enabled = true
cfg.Sampling.Tail.Policies = []config.TailSamplingPolicy{{SampleRate: 0.1}}
Expand Down
Loading