Skip to content

Commit

Permalink
[8.16](backport #41794) Remove ManagerV2 unnecessary reload when apm …
Browse files Browse the repository at this point in the history
…tracing config is nil (#41955)

* Remove ManagerV2 unnecessary reload when apm tracing config is nil (#41794)

Remove unnecessary reload due to apm tracing config change in ManagerV2 when apm tracing config changes from nil to nil.

(cherry picked from commit 2dbb606)

* Update CHANGELOG.next.asciidoc

---------

Co-authored-by: Carson Ip <[email protected]>
  • Loading branch information
mergify[bot] and carsonip authored Dec 9, 2024
1 parent 0ca833e commit 8f24f9e
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Support Elastic Agent control protocol chunking support {pull}37343[37343]
- Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816]
- Set timeout of 1 minute for FQDN requests {pull}37756[37756]
- Remove unnecessary reload for Elastic Agent managed beats when apm tracing config changes from nil to nil {pull}41794[41794]
- Fix incorrect cloud provider identification in add_cloud_metadata processor using provider priority mechanism {pull}41636[41636]

*Auditbeat*
Expand Down
13 changes: 7 additions & 6 deletions x-pack/libbeat/management/managerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,13 @@ func (cm *BeatV2Manager) reloadAPM(unit *agentUnit) {
apmConfig = expected.APMConfig
}
}

if (cm.lastAPMCfg == nil && apmConfig == nil) || (cm.lastAPMCfg != nil && gproto.Equal(cm.lastAPMCfg, apmConfig)) {
// configuration for the APM tracing did not change; do nothing
cm.logger.Debug("Skipped reloading APM tracing; configuration didn't change")
return
}

if apmConfig == nil {
// APM tracing is being stopped
cm.logger.Debug("Stopping APM tracing")
Expand All @@ -907,12 +914,6 @@ func (cm *BeatV2Manager) reloadAPM(unit *agentUnit) {
return
}

if cm.lastAPMCfg != nil && gproto.Equal(cm.lastAPMCfg, apmConfig) {
// configuration for the APM tracing did not change; do nothing
cm.logger.Debug("Skipped reloading APM tracing; configuration didn't change")
return
}

uconfig, err := conf.NewConfigFrom(apmConfig)
if err != nil {
cm.logger.Errorf("Failed to create uconfig from APM configuration: %s", err)
Expand Down
148 changes: 144 additions & 4 deletions x-pack/libbeat/management/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/joeshaw/multierror"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
Expand Down Expand Up @@ -266,6 +267,141 @@ func TestManagerV2(t *testing.T) {
}, 15*time.Second, 300*time.Millisecond)
}

func TestManagerV2_ReloadCount(t *testing.T) {
r := reload.NewRegistry()

output := &reloadable{}
r.MustRegisterOutput(output)
inputs := &reloadableList{}
r.MustRegisterInput(inputs)
apm := &reloadable{}
r.MustRegisterAPM(apm)

inputConfigUpdated := make(chan struct{})
onObserved := func(observed *proto.CheckinObserved, currentIdx int) {
if currentIdx == 1 {
period, err := inputs.Configs()[0].Config.String("period", -1)
require.NoError(t, err)
if period == "10m" {
select {
case <-inputConfigUpdated:
default:
close(inputConfigUpdated)
}
}
}
}

agentInfo := &proto.AgentInfo{
Id: "elastic-agent-id",
Version: version.GetDefaultVersion(),
Snapshot: true,
}
srv := integration.NewMockServer([]*proto.CheckinExpected{
{
AgentInfo: agentInfo,
Units: []*proto.UnitExpected{
{
Id: "output-unit",
Type: proto.UnitType_OUTPUT,
ConfigStateIdx: 1,
Config: &proto.UnitExpectedConfig{
Id: "default",
Type: "elasticsearch",
Name: "elasticsearch",
},
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_INFO,
},
{
Id: "input-unit-1",
Type: proto.UnitType_INPUT,
ConfigStateIdx: 1,
Config: &proto.UnitExpectedConfig{
Id: "system/metrics-system-default-system-1",
Type: "system/metrics",
Name: "system-1",
Streams: []*proto.Stream{
{
Id: "system/metrics-system.filesystem-default-system-1",
Source: integration.RequireNewStruct(t, map[string]interface{}{
"metricsets": []interface{}{"filesystem"},
"period": "1m",
}),
},
},
},
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_INFO,
},
},
Features: nil,
FeaturesIdx: 1,
},
{
AgentInfo: agentInfo,
Units: []*proto.UnitExpected{
{
Id: "output-unit",
Type: proto.UnitType_OUTPUT,
ConfigStateIdx: 1,
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_INFO,
},
{
Id: "input-unit-1",
Type: proto.UnitType_INPUT,
ConfigStateIdx: 2,
Config: &proto.UnitExpectedConfig{
Id: "system/metrics-system-default-system-1",
Type: "system/metrics",
Name: "system-1",
Streams: []*proto.Stream{
{
Id: "system/metrics-system.filesystem-default-system-1",
Source: integration.RequireNewStruct(t, map[string]interface{}{
"metricsets": []interface{}{"filesystem"},
"period": "10m",
}),
},
},
},
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_INFO,
},
},
Features: nil,
FeaturesIdx: 1,
},
},
onObserved,
500*time.Millisecond,
)
require.NoError(t, srv.Start())
defer srv.Stop()

client := client.NewV2(fmt.Sprintf(":%d", srv.Port), "", client.VersionInfo{
Name: "program",
Meta: map[string]string{
"key": "value",
},
}, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())))

m, err := NewV2AgentManagerWithClient(&Config{
Enabled: true,
}, r, client)
require.NoError(t, err)

err = m.Start()
require.NoError(t, err)
defer m.Stop()

<-inputConfigUpdated
assert.Equal(t, 1, output.reloadCount) // initial load
assert.Equal(t, 2, inputs.reloadCount) // initial load + config update
assert.Equal(t, 0, apm.reloadCount) // no apm tracing config applied
}

func TestOutputError(t *testing.T) {
// Uncomment the line below to see the debug logs for this test
// logp.DevelopmentSetup(logp.WithLevel(logp.DebugLevel), logp.WithSelectors("*"))
Expand Down Expand Up @@ -553,19 +689,22 @@ func TestErrorPerUnit(t *testing.T) {
}

type reloadable struct {
mx sync.Mutex
config *reload.ConfigWithMeta
mx sync.Mutex
config *reload.ConfigWithMeta
reloadCount int
}

type reloadableList struct {
mx sync.Mutex
configs []*reload.ConfigWithMeta
mx sync.Mutex
configs []*reload.ConfigWithMeta
reloadCount int
}

func (r *reloadable) Reload(config *reload.ConfigWithMeta) error {
r.mx.Lock()
defer r.mx.Unlock()
r.config = config
r.reloadCount++
return nil
}

Expand All @@ -579,6 +718,7 @@ func (r *reloadableList) Reload(configs []*reload.ConfigWithMeta) error {
r.mx.Lock()
defer r.mx.Unlock()
r.configs = configs
r.reloadCount++
return nil
}

Expand Down

0 comments on commit 8f24f9e

Please sign in to comment.