Skip to content

Commit

Permalink
Remove ManagerV2 unnecessary reload when apm tracing config is nil (#…
Browse files Browse the repository at this point in the history
…41794)

Remove unnecessary reload due to apm tracing config change in ManagerV2 when apm tracing config changes from nil to nil.
  • Loading branch information
carsonip authored Dec 9, 2024
1 parent a641687 commit 2dbb606
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Ensure Elasticsearch output can always recover from network errors {pull}40794[40794]
- Add `translate_ldap_attribute` processor. {pull}41472[41472]
- Remove unnecessary debug logs during idle connection teardown {issue}40824[40824]
- Remove unnecessary reload for Elastic Agent managed beats when apm tracing config changes from nil to nil {pull}41794[41794]
- Fix incorrect cloud provider identification in add_cloud_metadata processor using provider priority mechanism {pull}41636[41636]
- Prevent panic if libbeat processors are loaded more than once. {issue}41475[41475] {pull}41857[51857]
- Fix a bug where log files are rotated on startup when interval is configured and rotateonstartup is disabled {issue}41894[41894] {pull}41895[41895]
Expand Down
13 changes: 7 additions & 6 deletions x-pack/libbeat/management/managerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,13 @@ func (cm *BeatV2Manager) reloadAPM(unit *agentUnit) {
apmConfig = expected.APMConfig
}
}

if (cm.lastAPMCfg == nil && apmConfig == nil) || (cm.lastAPMCfg != nil && gproto.Equal(cm.lastAPMCfg, apmConfig)) {
// configuration for the APM tracing did not change; do nothing
cm.logger.Debug("Skipped reloading APM tracing; configuration didn't change")
return
}

if apmConfig == nil {
// APM tracing is being stopped
cm.logger.Debug("Stopping APM tracing")
Expand All @@ -907,12 +914,6 @@ func (cm *BeatV2Manager) reloadAPM(unit *agentUnit) {
return
}

if cm.lastAPMCfg != nil && gproto.Equal(cm.lastAPMCfg, apmConfig) {
// configuration for the APM tracing did not change; do nothing
cm.logger.Debug("Skipped reloading APM tracing; configuration didn't change")
return
}

uconfig, err := conf.NewConfigFrom(apmConfig)
if err != nil {
cm.logger.Errorf("Failed to create uconfig from APM configuration: %s", err)
Expand Down
148 changes: 144 additions & 4 deletions x-pack/libbeat/management/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/joeshaw/multierror"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
Expand Down Expand Up @@ -266,6 +267,141 @@ func TestManagerV2(t *testing.T) {
}, 15*time.Second, 300*time.Millisecond)
}

func TestManagerV2_ReloadCount(t *testing.T) {
r := reload.NewRegistry()

output := &reloadable{}
r.MustRegisterOutput(output)
inputs := &reloadableList{}
r.MustRegisterInput(inputs)
apm := &reloadable{}
r.MustRegisterAPM(apm)

inputConfigUpdated := make(chan struct{})
onObserved := func(observed *proto.CheckinObserved, currentIdx int) {
if currentIdx == 1 {
period, err := inputs.Configs()[0].Config.String("period", -1)
require.NoError(t, err)
if period == "10m" {
select {
case <-inputConfigUpdated:
default:
close(inputConfigUpdated)
}
}
}
}

agentInfo := &proto.AgentInfo{
Id: "elastic-agent-id",
Version: version.GetDefaultVersion(),
Snapshot: true,
}
srv := integration.NewMockServer([]*proto.CheckinExpected{
{
AgentInfo: agentInfo,
Units: []*proto.UnitExpected{
{
Id: "output-unit",
Type: proto.UnitType_OUTPUT,
ConfigStateIdx: 1,
Config: &proto.UnitExpectedConfig{
Id: "default",
Type: "elasticsearch",
Name: "elasticsearch",
},
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_INFO,
},
{
Id: "input-unit-1",
Type: proto.UnitType_INPUT,
ConfigStateIdx: 1,
Config: &proto.UnitExpectedConfig{
Id: "system/metrics-system-default-system-1",
Type: "system/metrics",
Name: "system-1",
Streams: []*proto.Stream{
{
Id: "system/metrics-system.filesystem-default-system-1",
Source: integration.RequireNewStruct(t, map[string]interface{}{
"metricsets": []interface{}{"filesystem"},
"period": "1m",
}),
},
},
},
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_INFO,
},
},
Features: nil,
FeaturesIdx: 1,
},
{
AgentInfo: agentInfo,
Units: []*proto.UnitExpected{
{
Id: "output-unit",
Type: proto.UnitType_OUTPUT,
ConfigStateIdx: 1,
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_INFO,
},
{
Id: "input-unit-1",
Type: proto.UnitType_INPUT,
ConfigStateIdx: 2,
Config: &proto.UnitExpectedConfig{
Id: "system/metrics-system-default-system-1",
Type: "system/metrics",
Name: "system-1",
Streams: []*proto.Stream{
{
Id: "system/metrics-system.filesystem-default-system-1",
Source: integration.RequireNewStruct(t, map[string]interface{}{
"metricsets": []interface{}{"filesystem"},
"period": "10m",
}),
},
},
},
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_INFO,
},
},
Features: nil,
FeaturesIdx: 1,
},
},
onObserved,
500*time.Millisecond,
)
require.NoError(t, srv.Start())
defer srv.Stop()

client := client.NewV2(fmt.Sprintf(":%d", srv.Port), "", client.VersionInfo{
Name: "program",
Meta: map[string]string{
"key": "value",
},
}, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())))

m, err := NewV2AgentManagerWithClient(&Config{
Enabled: true,
}, r, client)
require.NoError(t, err)

err = m.Start()
require.NoError(t, err)
defer m.Stop()

<-inputConfigUpdated
assert.Equal(t, 1, output.reloadCount) // initial load
assert.Equal(t, 2, inputs.reloadCount) // initial load + config update
assert.Equal(t, 0, apm.reloadCount) // no apm tracing config applied
}

func TestOutputError(t *testing.T) {
// Uncomment the line below to see the debug logs for this test
// logp.DevelopmentSetup(logp.WithLevel(logp.DebugLevel), logp.WithSelectors("*"))
Expand Down Expand Up @@ -553,19 +689,22 @@ func TestErrorPerUnit(t *testing.T) {
}

type reloadable struct {
mx sync.Mutex
config *reload.ConfigWithMeta
mx sync.Mutex
config *reload.ConfigWithMeta
reloadCount int
}

type reloadableList struct {
mx sync.Mutex
configs []*reload.ConfigWithMeta
mx sync.Mutex
configs []*reload.ConfigWithMeta
reloadCount int
}

func (r *reloadable) Reload(config *reload.ConfigWithMeta) error {
r.mx.Lock()
defer r.mx.Unlock()
r.config = config
r.reloadCount++
return nil
}

Expand All @@ -579,6 +718,7 @@ func (r *reloadableList) Reload(configs []*reload.ConfigWithMeta) error {
r.mx.Lock()
defer r.mx.Unlock()
r.configs = configs
r.reloadCount++
return nil
}

Expand Down

0 comments on commit 2dbb606

Please sign in to comment.