Skip to content

Commit

Permalink
fix: Update FlushBytes parsing/defaults
Browse files Browse the repository at this point in the history
Updates the `FlushBytes` setting to default to 1 mib and only override
to 24kb if the user has explicitly set it to 24kb.

Fixes elastic#13024

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop committed Jul 4, 2024
1 parent f8c1d94 commit 62f2fe2
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 49 deletions.
110 changes: 61 additions & 49 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,6 @@ func (s *Runner) newFinalBatchProcessor(
newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error),
memLimit float64,
) (modelpb.BatchProcessor, func(context.Context) error, error) {

monitoring.Default.Remove("libbeat")
libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat")
if s.elasticsearchOutputConfig == nil {
Expand All @@ -649,59 +648,16 @@ func (s *Runner) newFinalBatchProcessor(
}
monitoring.NewString(outputRegistry, "name").Set("elasticsearch")

var esConfig struct {
*elasticsearch.Config `config:",inline"`
FlushBytes string `config:"flush_bytes"`
FlushInterval time.Duration `config:"flush_interval"`
MaxRequests int `config:"max_requests"`
Scaling struct {
Enabled *bool `config:"enabled"`
} `config:"autoscaling"`
}
esConfig.FlushInterval = time.Second
esConfig.Config = elasticsearch.DefaultConfig()
esConfig.MaxIdleConnsPerHost = 10
if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil {
// Create the docappender and Elasticsearch config
appenderCfg, esCfg, err := s.newDocappenderConfig(tracer, memLimit)
if err != nil {
return nil, nil, err
}

if esConfig.MaxRequests != 0 {
esConfig.MaxIdleConnsPerHost = esConfig.MaxRequests
}

var flushBytes int
if esConfig.FlushBytes != "" {
b, err := humanize.ParseBytes(esConfig.FlushBytes)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse flush_bytes: %w", err)
}
flushBytes = int(b)
}
minFlush := 24 * 1024
if esConfig.CompressionLevel != 0 && flushBytes < minFlush {
s.logger.Warnf("flush_bytes config value is too small (%d) and might be ignored by the indexer, increasing value to %d", flushBytes, minFlush)
flushBytes = minFlush
}
client, err := newElasticsearchClient(esConfig.Config)
client, err := newElasticsearchClient(esCfg)
if err != nil {
return nil, nil, err
}
var scalingCfg docappender.ScalingConfig
if enabled := esConfig.Scaling.Enabled; enabled != nil {
scalingCfg.Disabled = !*enabled
}
opts := docappender.Config{
CompressionLevel: esConfig.CompressionLevel,
FlushBytes: flushBytes,
FlushInterval: esConfig.FlushInterval,
Tracer: tracer,
MaxRequests: esConfig.MaxRequests,
Scaling: scalingCfg,
Logger: zap.New(s.logger.Core(), zap.WithCaller(true)),
RequireDataStream: true,
}
opts = docappenderConfig(opts, memLimit, s.logger)
appender, err := docappender.New(client, opts)
appender, err := docappender.New(client, appenderCfg)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -764,6 +720,62 @@ func (s *Runner) newFinalBatchProcessor(
return newDocappenderBatchProcessor(appender), appender.Close, nil
}

func (s *Runner) newDocappenderConfig(tracer *apm.Tracer, memLimit float64) (
docappender.Config, *elasticsearch.Config, error,
) {
var esConfig struct {
*elasticsearch.Config `config:",inline"`
FlushBytes string `config:"flush_bytes"`
FlushInterval time.Duration `config:"flush_interval"`
MaxRequests int `config:"max_requests"`
Scaling struct {
Enabled *bool `config:"enabled"`
} `config:"autoscaling"`
}
// Default to 1mib flushes, which is the default for go-docappender.
// esConfig.FlushBytes = "1 mib"
esConfig.FlushInterval = time.Second
esConfig.Config = elasticsearch.DefaultConfig()
esConfig.MaxIdleConnsPerHost = 10

if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil {
return docappender.Config{}, nil, err
}

var flushBytes int
if esConfig.FlushBytes != "" {
b, err := humanize.ParseBytes(esConfig.FlushBytes)
if err != nil {
return docappender.Config{}, nil, fmt.Errorf("failed to parse flush_bytes: %w", err)
}
flushBytes = int(b)
}
minFlush := 24 * 1024
if esConfig.CompressionLevel != 0 && flushBytes < minFlush {
s.logger.Warnf("flush_bytes config value is too small (%d) and might be ignored by the indexer, increasing value to %d", flushBytes, minFlush)
flushBytes = minFlush
}
var scalingCfg docappender.ScalingConfig
if enabled := esConfig.Scaling.Enabled; enabled != nil {
scalingCfg.Disabled = !*enabled
}
cfg := docappenderConfig(docappender.Config{
CompressionLevel: esConfig.CompressionLevel,
FlushBytes: flushBytes,
FlushInterval: esConfig.FlushInterval,
Tracer: tracer,
MaxRequests: esConfig.MaxRequests,
Scaling: scalingCfg,
Logger: zap.New(s.logger.Core(), zap.WithCaller(true)),
RequireDataStream: true,
}, memLimit, s.logger)
if cfg.MaxRequests != 0 {
esConfig.MaxIdleConnsPerHost = cfg.MaxRequests
}

return cfg, esConfig.Config, nil
}

func docappenderConfig(
opts docappender.Config, memLimit float64, logger *logp.Logger,
) docappender.Config {
Expand Down
75 changes: 75 additions & 0 deletions internal/beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.elastic.co/apm/v2/apmtest"
"go.uber.org/zap"

"github.com/elastic/apm-server/internal/beater/config"
"github.com/elastic/apm-server/internal/elasticsearch"
agentconfig "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/go-docappender/v2"
)

func TestStoreUsesRUMElasticsearchConfig(t *testing.T) {
Expand Down Expand Up @@ -152,3 +156,74 @@ func newMockClusterUUIDClient(t testing.TB, clusterUUID string) *elasticsearch.C
require.NoError(t, err)
return client
}

func TestRunnerNewDocappenderConfig(t *testing.T) {
var tc = []struct {
memSize float64
wantMaxRequests int
wantDocBufSize int
}{
{memSize: 1, wantMaxRequests: 11, wantDocBufSize: 819},
{memSize: 2, wantMaxRequests: 13, wantDocBufSize: 1638},
{memSize: 4, wantMaxRequests: 16, wantDocBufSize: 3276},
{memSize: 8, wantMaxRequests: 22, wantDocBufSize: 6553},
}
for _, c := range tc {
t.Run(fmt.Sprintf("default/%vgb", c.memSize), func(t *testing.T) {
r := Runner{
elasticsearchOutputConfig: agentconfig.NewConfig(),
logger: logp.NewLogger("test"),
}
docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize)
require.NoError(t, err)
assert.Equal(t, docappender.Config{
Logger: zap.New(r.logger.Core(), zap.WithCaller(true)),
CompressionLevel: 5,
RequireDataStream: true,
FlushInterval: time.Second,
FlushBytes: 1024 * 1024,
MaxRequests: c.wantMaxRequests,
DocumentBufferSize: c.wantDocBufSize,
}, docCfg)
assert.Equal(t, &elasticsearch.Config{
Hosts: elasticsearch.Hosts{"localhost:9200"},
Backoff: elasticsearch.DefaultBackoffConfig,
Protocol: "http",
CompressionLevel: 5,
Timeout: 5 * time.Second,
MaxRetries: 3,
MaxIdleConnsPerHost: c.wantMaxRequests,
}, esCfg)
})
t.Run(fmt.Sprintf("override/%vgb", c.memSize), func(t *testing.T) {
r := Runner{
elasticsearchOutputConfig: agentconfig.MustNewConfigFrom(map[string]interface{}{
"flush_bytes": "500 kib",
"flush_interval": "2s",
"max_requests": 50,
}),
logger: logp.NewLogger("test"),
}
docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize)
require.NoError(t, err)
assert.Equal(t, docappender.Config{
Logger: zap.New(r.logger.Core(), zap.WithCaller(true)),
CompressionLevel: 5,
RequireDataStream: true,
FlushInterval: 2 * time.Second,
FlushBytes: 500 * 1024,
MaxRequests: 50,
DocumentBufferSize: c.wantDocBufSize,
}, docCfg)
assert.Equal(t, &elasticsearch.Config{
Hosts: elasticsearch.Hosts{"localhost:9200"},
Backoff: elasticsearch.DefaultBackoffConfig,
Protocol: "http",
CompressionLevel: 5,
Timeout: 5 * time.Second,
MaxRetries: 3,
MaxIdleConnsPerHost: 50,
}, esCfg)
})
}
}

0 comments on commit 62f2fe2

Please sign in to comment.