diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index aacf702a4db3..76bc04ede5dd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -115,6 +115,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Ensure Elasticsearch output can always recover from network errors {pull}40794[40794] - Add `translate_ldap_attribute` processor. {pull}41472[41472] - Remove unnecessary debug logs during idle connection teardown {issue}40824[40824] +- Remove unnecessary reload for Elastic Agent managed beats when apm tracing config changes from nil to nil {pull}41794[41794] - Fix incorrect cloud provider identification in add_cloud_metadata processor using provider priority mechanism {pull}41636[41636] - Prevent panic if libbeat processors are loaded more than once. {issue}41475[41475] {pull}41857[51857] - Fix a bug where log files are rotated on startup when interval is configured and rotateonstartup is disabled {issue}41894[41894] {pull}41895[41895] diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index e39b394bf2b3..288f28ae0de5 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -893,6 +893,13 @@ func (cm *BeatV2Manager) reloadAPM(unit *agentUnit) { apmConfig = expected.APMConfig } } + + if (cm.lastAPMCfg == nil && apmConfig == nil) || (cm.lastAPMCfg != nil && gproto.Equal(cm.lastAPMCfg, apmConfig)) { + // configuration for the APM tracing did not change; do nothing + cm.logger.Debug("Skipped reloading APM tracing; configuration didn't change") + return + } + if apmConfig == nil { // APM tracing is being stopped cm.logger.Debug("Stopping APM tracing") @@ -907,12 +914,6 @@ func (cm *BeatV2Manager) reloadAPM(unit *agentUnit) { return } - if cm.lastAPMCfg != nil && gproto.Equal(cm.lastAPMCfg, apmConfig) { - // configuration for the APM tracing did not change; do nothing - cm.logger.Debug("Skipped reloading APM tracing; configuration didn't change") - return - } - uconfig, err := conf.NewConfigFrom(apmConfig) if err != nil { cm.logger.Errorf("Failed to create uconfig from APM configuration: %s", err) diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index f1b32b15d82d..e7515266b0bc 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/joeshaw/multierror" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" "google.golang.org/grpc" @@ -266,6 +267,141 @@ func TestManagerV2(t *testing.T) { }, 15*time.Second, 300*time.Millisecond) } +func TestManagerV2_ReloadCount(t *testing.T) { + r := reload.NewRegistry() + + output := &reloadable{} + r.MustRegisterOutput(output) + inputs := &reloadableList{} + r.MustRegisterInput(inputs) + apm := &reloadable{} + r.MustRegisterAPM(apm) + + inputConfigUpdated := make(chan struct{}) + onObserved := func(observed *proto.CheckinObserved, currentIdx int) { + if currentIdx == 1 { + period, err := inputs.Configs()[0].Config.String("period", -1) + require.NoError(t, err) + if period == "10m" { + select { + case <-inputConfigUpdated: + default: + close(inputConfigUpdated) + } + } + } + } + + agentInfo := &proto.AgentInfo{ + Id: "elastic-agent-id", + Version: version.GetDefaultVersion(), + Snapshot: true, + } + srv := integration.NewMockServer([]*proto.CheckinExpected{ + { + AgentInfo: agentInfo, + Units: []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + Config: &proto.UnitExpectedConfig{ + Id: "system/metrics-system-default-system-1", + Type: "system/metrics", + Name: "system-1", + Streams: []*proto.Stream{ + { + Id: "system/metrics-system.filesystem-default-system-1", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "metricsets": []interface{}{"filesystem"}, + "period": "1m", + }), + }, + }, + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + }, + Features: nil, + FeaturesIdx: 1, + }, + { + AgentInfo: agentInfo, + Units: []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 2, + Config: &proto.UnitExpectedConfig{ + Id: "system/metrics-system-default-system-1", + Type: "system/metrics", + Name: "system-1", + Streams: []*proto.Stream{ + { + Id: "system/metrics-system.filesystem-default-system-1", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "metricsets": []interface{}{"filesystem"}, + "period": "10m", + }), + }, + }, + }, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + }, + }, + Features: nil, + FeaturesIdx: 1, + }, + }, + onObserved, + 500*time.Millisecond, + ) + require.NoError(t, srv.Start()) + defer srv.Stop() + + client := client.NewV2(fmt.Sprintf(":%d", srv.Port), "", client.VersionInfo{ + Name: "program", + Meta: map[string]string{ + "key": "value", + }, + }, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) + + m, err := NewV2AgentManagerWithClient(&Config{ + Enabled: true, + }, r, client) + require.NoError(t, err) + + err = m.Start() + require.NoError(t, err) + defer m.Stop() + + <-inputConfigUpdated + assert.Equal(t, 1, output.reloadCount) // initial load + assert.Equal(t, 2, inputs.reloadCount) // initial load + config update + assert.Equal(t, 0, apm.reloadCount) // no apm tracing config applied +} + func TestOutputError(t *testing.T) { // Uncomment the line below to see the debug logs for this test // logp.DevelopmentSetup(logp.WithLevel(logp.DebugLevel), logp.WithSelectors("*")) @@ -553,19 +689,22 @@ func TestErrorPerUnit(t *testing.T) { } type reloadable struct { - mx sync.Mutex - config *reload.ConfigWithMeta + mx sync.Mutex + config *reload.ConfigWithMeta + reloadCount int } type reloadableList struct { - mx sync.Mutex - configs []*reload.ConfigWithMeta + mx sync.Mutex + configs []*reload.ConfigWithMeta + reloadCount int } func (r *reloadable) Reload(config *reload.ConfigWithMeta) error { r.mx.Lock() defer r.mx.Unlock() r.config = config + r.reloadCount++ return nil } @@ -579,6 +718,7 @@ func (r *reloadableList) Reload(configs []*reload.ConfigWithMeta) error { r.mx.Lock() defer r.mx.Unlock() r.configs = configs + r.reloadCount++ return nil }