diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 758eac2c089..e05ef861248 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -633,7 +633,6 @@ func (s *Runner) newFinalBatchProcessor( newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error), memLimit float64, ) (modelpb.BatchProcessor, func(context.Context) error, error) { - monitoring.Default.Remove("libbeat") libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat") if s.elasticsearchOutputConfig == nil { @@ -649,59 +648,16 @@ func (s *Runner) newFinalBatchProcessor( } monitoring.NewString(outputRegistry, "name").Set("elasticsearch") - var esConfig struct { - *elasticsearch.Config `config:",inline"` - FlushBytes string `config:"flush_bytes"` - FlushInterval time.Duration `config:"flush_interval"` - MaxRequests int `config:"max_requests"` - Scaling struct { - Enabled *bool `config:"enabled"` - } `config:"autoscaling"` - } - esConfig.FlushInterval = time.Second - esConfig.Config = elasticsearch.DefaultConfig() - esConfig.MaxIdleConnsPerHost = 10 - if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil { + // Create the docappender and Elasticsearch config + appenderCfg, esCfg, err := s.newDocappenderConfig(tracer, memLimit) + if err != nil { return nil, nil, err } - - if esConfig.MaxRequests != 0 { - esConfig.MaxIdleConnsPerHost = esConfig.MaxRequests - } - - var flushBytes int - if esConfig.FlushBytes != "" { - b, err := humanize.ParseBytes(esConfig.FlushBytes) - if err != nil { - return nil, nil, fmt.Errorf("failed to parse flush_bytes: %w", err) - } - flushBytes = int(b) - } - minFlush := 24 * 1024 - if esConfig.CompressionLevel != 0 && flushBytes < minFlush { - s.logger.Warnf("flush_bytes config value is too small (%d) and might be ignored by the indexer, increasing value to %d", flushBytes, minFlush) - flushBytes = minFlush - } - client, err := newElasticsearchClient(esConfig.Config) + client, err := newElasticsearchClient(esCfg) if err != nil { return nil, nil, err } - var scalingCfg docappender.ScalingConfig - if enabled := esConfig.Scaling.Enabled; enabled != nil { - scalingCfg.Disabled = !*enabled - } - opts := docappender.Config{ - CompressionLevel: esConfig.CompressionLevel, - FlushBytes: flushBytes, - FlushInterval: esConfig.FlushInterval, - Tracer: tracer, - MaxRequests: esConfig.MaxRequests, - Scaling: scalingCfg, - Logger: zap.New(s.logger.Core(), zap.WithCaller(true)), - RequireDataStream: true, - } - opts = docappenderConfig(opts, memLimit, s.logger) - appender, err := docappender.New(client, opts) + appender, err := docappender.New(client, appenderCfg) if err != nil { return nil, nil, err } @@ -764,6 +720,62 @@ func (s *Runner) newFinalBatchProcessor( return newDocappenderBatchProcessor(appender), appender.Close, nil } +func (s *Runner) newDocappenderConfig(tracer *apm.Tracer, memLimit float64) ( + docappender.Config, *elasticsearch.Config, error, +) { + var esConfig struct { + *elasticsearch.Config `config:",inline"` + FlushBytes string `config:"flush_bytes"` + FlushInterval time.Duration `config:"flush_interval"` + MaxRequests int `config:"max_requests"` + Scaling struct { + Enabled *bool `config:"enabled"` + } `config:"autoscaling"` + } + // Default to 1mib flushes, which is the default for go-docappender. + // esConfig.FlushBytes = "1 mib" + esConfig.FlushInterval = time.Second + esConfig.Config = elasticsearch.DefaultConfig() + esConfig.MaxIdleConnsPerHost = 10 + + if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil { + return docappender.Config{}, nil, err + } + + var flushBytes int + if esConfig.FlushBytes != "" { + b, err := humanize.ParseBytes(esConfig.FlushBytes) + if err != nil { + return docappender.Config{}, nil, fmt.Errorf("failed to parse flush_bytes: %w", err) + } + flushBytes = int(b) + } + minFlush := 24 * 1024 + if esConfig.CompressionLevel != 0 && flushBytes < minFlush { + s.logger.Warnf("flush_bytes config value is too small (%d) and might be ignored by the indexer, increasing value to %d", flushBytes, minFlush) + flushBytes = minFlush + } + var scalingCfg docappender.ScalingConfig + if enabled := esConfig.Scaling.Enabled; enabled != nil { + scalingCfg.Disabled = !*enabled + } + cfg := docappenderConfig(docappender.Config{ + CompressionLevel: esConfig.CompressionLevel, + FlushBytes: flushBytes, + FlushInterval: esConfig.FlushInterval, + Tracer: tracer, + MaxRequests: esConfig.MaxRequests, + Scaling: scalingCfg, + Logger: zap.New(s.logger.Core(), zap.WithCaller(true)), + RequireDataStream: true, + }, memLimit, s.logger) + if cfg.MaxRequests != 0 { + esConfig.MaxIdleConnsPerHost = cfg.MaxRequests + } + + return cfg, esConfig.Config, nil +} + func docappenderConfig( opts docappender.Config, memLimit float64, logger *logp.Logger, ) docappender.Config { diff --git a/internal/beater/beater_test.go b/internal/beater/beater_test.go index ce968f02eba..3e911b35d8b 100644 --- a/internal/beater/beater_test.go +++ b/internal/beater/beater_test.go @@ -29,10 +29,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.elastic.co/apm/v2/apmtest" + "go.uber.org/zap" "github.com/elastic/apm-server/internal/beater/config" "github.com/elastic/apm-server/internal/elasticsearch" + agentconfig "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/go-docappender/v2" ) func TestStoreUsesRUMElasticsearchConfig(t *testing.T) { @@ -152,3 +156,74 @@ func newMockClusterUUIDClient(t testing.TB, clusterUUID string) *elasticsearch.C require.NoError(t, err) return client } + +func TestRunnerNewDocappenderConfig(t *testing.T) { + var tc = []struct { + memSize float64 + wantMaxRequests int + wantDocBufSize int + }{ + {memSize: 1, wantMaxRequests: 11, wantDocBufSize: 819}, + {memSize: 2, wantMaxRequests: 13, wantDocBufSize: 1638}, + {memSize: 4, wantMaxRequests: 16, wantDocBufSize: 3276}, + {memSize: 8, wantMaxRequests: 22, wantDocBufSize: 6553}, + } + for _, c := range tc { + t.Run(fmt.Sprintf("default/%vgb", c.memSize), func(t *testing.T) { + r := Runner{ + elasticsearchOutputConfig: agentconfig.NewConfig(), + logger: logp.NewLogger("test"), + } + docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize) + require.NoError(t, err) + assert.Equal(t, docappender.Config{ + Logger: zap.New(r.logger.Core(), zap.WithCaller(true)), + CompressionLevel: 5, + RequireDataStream: true, + FlushInterval: time.Second, + FlushBytes: 1024 * 1024, + MaxRequests: c.wantMaxRequests, + DocumentBufferSize: c.wantDocBufSize, + }, docCfg) + assert.Equal(t, &elasticsearch.Config{ + Hosts: elasticsearch.Hosts{"localhost:9200"}, + Backoff: elasticsearch.DefaultBackoffConfig, + Protocol: "http", + CompressionLevel: 5, + Timeout: 5 * time.Second, + MaxRetries: 3, + MaxIdleConnsPerHost: c.wantMaxRequests, + }, esCfg) + }) + t.Run(fmt.Sprintf("override/%vgb", c.memSize), func(t *testing.T) { + r := Runner{ + elasticsearchOutputConfig: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "flush_bytes": "500 kib", + "flush_interval": "2s", + "max_requests": 50, + }), + logger: logp.NewLogger("test"), + } + docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize) + require.NoError(t, err) + assert.Equal(t, docappender.Config{ + Logger: zap.New(r.logger.Core(), zap.WithCaller(true)), + CompressionLevel: 5, + RequireDataStream: true, + FlushInterval: 2 * time.Second, + FlushBytes: 500 * 1024, + MaxRequests: 50, + DocumentBufferSize: c.wantDocBufSize, + }, docCfg) + assert.Equal(t, &elasticsearch.Config{ + Hosts: elasticsearch.Hosts{"localhost:9200"}, + Backoff: elasticsearch.DefaultBackoffConfig, + Protocol: "http", + CompressionLevel: 5, + Timeout: 5 * time.Second, + MaxRetries: 3, + MaxIdleConnsPerHost: 50, + }, esCfg) + }) + } +}