From a8306c97b140adc04e034ac403b1b687c68fd795 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 5 Nov 2024 19:48:48 +0530 Subject: [PATCH] [cmd/opampsupervisor]: Supervisor waits for configurable healthchecks to report remote config status (#34907) **Description:** This pull request addresses the remote config status reporting issue discussed in #21079 by introducing the following options to the Agent config: 1. `config_apply_timeout`: config update is successful if we receive a healthy status and then observe no failure updates for the entire duration of the timeout period; otherwise, failure is reported. **Link to tracking Issue:** #21079 **Testing:** Added e2e test **Documentation:** --- ...pamp_21079_configurable_health_checks.yaml | 27 ++++ cmd/opampsupervisor/e2e_test.go | 127 +++++++++++++++++- .../supervisor/config/config.go | 6 + .../supervisor/config/config_test.go | 41 ++++++ cmd/opampsupervisor/supervisor/supervisor.go | 84 +++++++++--- .../supervisor/supervisor_test.go | 4 +- .../supervisor/supervisor_report_status.yaml | 19 +++ 7 files changed, 283 insertions(+), 25 deletions(-) create mode 100644 .chloggen/opamp_21079_configurable_health_checks.yaml create mode 100644 cmd/opampsupervisor/testdata/supervisor/supervisor_report_status.yaml diff --git a/.chloggen/opamp_21079_configurable_health_checks.yaml b/.chloggen/opamp_21079_configurable_health_checks.yaml new file mode 100644 index 000000000000..89681006555d --- /dev/null +++ b/.chloggen/opamp_21079_configurable_health_checks.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Supervisor waits for configurable healthchecks to report remote config status. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [21079] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 1fde97d26a73..c0e080faa1e2 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -162,7 +162,10 @@ func newSupervisor(t *testing.T, configType string, extraConfigData map[string]s cfg, err := config.Load(cfgFile.Name()) require.NoError(t, err) - s, err := supervisor.NewSupervisor(zap.NewNop(), cfg) + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + s, err := supervisor.NewSupervisor(logger, cfg) require.NoError(t, err) return s @@ -1443,6 +1446,128 @@ func TestSupervisorLogging(t *testing.T) { require.NoError(t, logFile.Close()) } +func TestSupervisorRemoteConfigApplyStatus(t *testing.T) { + var agentConfig atomic.Value + var healthReport atomic.Value + var remoteConfigStatus atomic.Value + server := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.EffectiveConfig != nil { + config := message.EffectiveConfig.ConfigMap.ConfigMap[""] + if config != nil { + agentConfig.Store(string(config.Body)) + } + } + if message.Health != nil { + healthReport.Store(message.Health) + } + if message.RemoteConfigStatus != nil { + remoteConfigStatus.Store(message.RemoteConfigStatus) + } + + return &protobufs.ServerToAgent{} + }, + }) + + s := newSupervisor(t, "report_status", map[string]string{ + "url": server.addr, + "config_apply_timeout": "3s", + }) + require.Nil(t, s.Start()) + defer s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, true) + + cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t) + + server.sendToSupervisor(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: cfg.Bytes()}, + }, + }, + ConfigHash: hash, + }, + }) + + // Check that the status is set to APPLYING + require.Eventually(t, func() bool { + status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus) + return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING + }, 5*time.Second, 100*time.Millisecond, "Remote config status was not set to APPLYING") + + // Wait for collector to become healthy + require.Eventually(t, func() bool { + health, ok := healthReport.Load().(*protobufs.ComponentHealth) + return ok && health.Healthy + }, 10*time.Second, 10*time.Millisecond, "Collector did not become healthy") + + // Check that the status is set to APPLIED + require.Eventually(t, func() bool { + status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus) + return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED + }, 5*time.Second, 10*time.Millisecond, "Remote config status was not set to APPLIED") + + require.Eventually(t, func() bool { + cfg, ok := agentConfig.Load().(string) + if ok { + // The effective config may be structurally different compared to what was sent, + // and will also have some data redacted, + // so just check that it includes the filelog receiver + return strings.Contains(cfg, "filelog") + } + + return false + }, 5*time.Second, 10*time.Millisecond, "Collector was not started with remote config") + + n, err := inputFile.WriteString("{\"body\":\"hello, world\"}\n") + require.NotZero(t, n, "Could not write to input file") + require.NoError(t, err) + + require.Eventually(t, func() bool { + logRecord := make([]byte, 1024) + n, _ := outputFile.Read(logRecord) + + return n != 0 + }, 10*time.Second, 100*time.Millisecond, "Log never appeared in output") + + // Test with bad configuration + badCfg, badHash := createBadCollectorConf(t) + + server.sendToSupervisor(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: badCfg.Bytes()}, + }, + }, + ConfigHash: badHash, + }, + }) + + // Check that the status is set to APPLYING + require.Eventually(t, func() bool { + status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus) + return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING + }, 5*time.Second, 200*time.Millisecond, "Remote config status was not set to APPLYING for bad config") + + // Wait for the health checks to fail + require.Eventually(t, func() bool { + health, ok := healthReport.Load().(*protobufs.ComponentHealth) + return ok && !health.Healthy + }, 30*time.Second, 100*time.Millisecond, "Collector did not become unhealthy with bad config") + + // Check that the status is set to FAILED after failed health checks + require.Eventually(t, func() bool { + status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus) + return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED + }, 15*time.Second, 100*time.Millisecond, "Remote config status was not set to FAILED for bad config") +} + func TestSupervisorOpAmpServerPort(t *testing.T) { var agentConfig atomic.Value server := newOpAMPServer( diff --git a/cmd/opampsupervisor/supervisor/config/config.go b/cmd/opampsupervisor/supervisor/config/config.go index 2b77cc56bd62..68c99fa4c755 100644 --- a/cmd/opampsupervisor/supervisor/config/config.go +++ b/cmd/opampsupervisor/supervisor/config/config.go @@ -153,6 +153,7 @@ type Agent struct { Executable string OrphanDetectionInterval time.Duration `mapstructure:"orphan_detection_interval"` Description AgentDescription `mapstructure:"description"` + ConfigApplyTimeout time.Duration `mapstructure:"config_apply_timeout"` BootstrapTimeout time.Duration `mapstructure:"bootstrap_timeout"` HealthCheckPort int `mapstructure:"health_check_port"` OpAMPServerPort int `mapstructure:"opamp_server_port"` @@ -185,6 +186,10 @@ func (a Agent) Validate() error { return fmt.Errorf("could not stat agent::executable path: %w", err) } + if a.ConfigApplyTimeout <= 0 { + return errors.New("agent::config_apply_timeout must be valid duration") + } + return nil } @@ -234,6 +239,7 @@ func DefaultSupervisor() Supervisor { }, Agent: Agent{ OrphanDetectionInterval: 5 * time.Second, + ConfigApplyTimeout: 5 * time.Second, BootstrapTimeout: 3 * time.Second, PassthroughLogs: false, }, diff --git a/cmd/opampsupervisor/supervisor/config/config_test.go b/cmd/opampsupervisor/supervisor/config/config_test.go index 9616c9da52d5..83e44b7b7a32 100644 --- a/cmd/opampsupervisor/supervisor/config/config_test.go +++ b/cmd/opampsupervisor/supervisor/config/config_test.go @@ -36,6 +36,7 @@ func TestValidate(t *testing.T) { Agent: Agent{ Executable: "${file_path}", OrphanDetectionInterval: 5 * time.Second, + ConfigApplyTimeout: 2 * time.Second, BootstrapTimeout: 5 * time.Second, }, Capabilities: Capabilities{ @@ -59,6 +60,7 @@ func TestValidate(t *testing.T) { }, Agent: Agent{ Executable: "${file_path}", + ConfigApplyTimeout: 2 * time.Second, OrphanDetectionInterval: 5 * time.Second, }, Capabilities: Capabilities{ @@ -84,6 +86,7 @@ func TestValidate(t *testing.T) { }, Agent: Agent{ Executable: "${file_path}", + ConfigApplyTimeout: 2 * time.Second, OrphanDetectionInterval: 5 * time.Second, }, Capabilities: Capabilities{ @@ -109,6 +112,7 @@ func TestValidate(t *testing.T) { }, Agent: Agent{ Executable: "${file_path}", + ConfigApplyTimeout: 2 * time.Second, OrphanDetectionInterval: 5 * time.Second, }, Capabilities: Capabilities{ @@ -138,6 +142,7 @@ func TestValidate(t *testing.T) { }, Agent: Agent{ Executable: "${file_path}", + ConfigApplyTimeout: 2 * time.Second, OrphanDetectionInterval: 5 * time.Second, }, Capabilities: Capabilities{ @@ -164,6 +169,7 @@ func TestValidate(t *testing.T) { Agent: Agent{ Executable: "", OrphanDetectionInterval: 5 * time.Second, + ConfigApplyTimeout: 2 * time.Second, BootstrapTimeout: 5 * time.Second, }, Capabilities: Capabilities{ @@ -190,6 +196,7 @@ func TestValidate(t *testing.T) { Agent: Agent{ Executable: "./path/does/not/exist", OrphanDetectionInterval: 5 * time.Second, + ConfigApplyTimeout: 2 * time.Second, BootstrapTimeout: 5 * time.Second, }, Capabilities: Capabilities{ @@ -215,6 +222,7 @@ func TestValidate(t *testing.T) { }, Agent: Agent{ Executable: "${file_path}", + ConfigApplyTimeout: 2 * time.Second, OrphanDetectionInterval: -1, }, Capabilities: Capabilities{ @@ -242,6 +250,7 @@ func TestValidate(t *testing.T) { Executable: "${file_path}", OrphanDetectionInterval: 5 * time.Second, HealthCheckPort: 65536, + ConfigApplyTimeout: 2 * time.Second, BootstrapTimeout: 5 * time.Second, }, Capabilities: Capabilities{ @@ -269,6 +278,7 @@ func TestValidate(t *testing.T) { Executable: "${file_path}", OrphanDetectionInterval: 5 * time.Second, HealthCheckPort: 0, + ConfigApplyTimeout: 2 * time.Second, BootstrapTimeout: 5 * time.Second, }, Capabilities: Capabilities{ @@ -295,6 +305,7 @@ func TestValidate(t *testing.T) { Executable: "${file_path}", OrphanDetectionInterval: 5 * time.Second, HealthCheckPort: 29848, + ConfigApplyTimeout: 2 * time.Second, BootstrapTimeout: 5 * time.Second, }, Capabilities: Capabilities{ @@ -320,6 +331,7 @@ func TestValidate(t *testing.T) { Agent: Agent{ Executable: "${file_path}", OrphanDetectionInterval: 5 * time.Second, + ConfigApplyTimeout: 2 * time.Second, BootstrapTimeout: -5 * time.Second, }, Capabilities: Capabilities{ @@ -343,6 +355,7 @@ func TestValidate(t *testing.T) { Agent: Agent{ Executable: "${file_path}", OrphanDetectionInterval: 5 * time.Second, + ConfigApplyTimeout: 2 * time.Second, OpAMPServerPort: 65536, BootstrapTimeout: 5 * time.Second, }, @@ -367,6 +380,7 @@ func TestValidate(t *testing.T) { Agent: Agent{ Executable: "${file_path}", OrphanDetectionInterval: 5 * time.Second, + ConfigApplyTimeout: 2 * time.Second, OpAMPServerPort: 0, BootstrapTimeout: 5 * time.Second, }, @@ -378,6 +392,33 @@ func TestValidate(t *testing.T) { }, }, }, + { + name: "Invalid config apply timeout", + config: Supervisor{ + Server: OpAMPServer{ + Endpoint: "wss://localhost:9090/opamp", + Headers: http.Header{ + "Header1": []string{"HeaderValue"}, + }, + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + }, + Agent: Agent{ + Executable: "${file_path}", + OrphanDetectionInterval: 5 * time.Second, + OpAMPServerPort: 8080, + BootstrapTimeout: 5 * time.Second, + }, + Capabilities: Capabilities{ + AcceptsRemoteConfig: true, + }, + Storage: Storage{ + Directory: "/etc/opamp-supervisor/storage", + }, + }, + expectedError: "agent::config_apply_timeout must be valid duration", + }, } // create some fake files for validating agent config diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 48f3439413d3..b2f17d7494ca 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -129,6 +129,13 @@ type Supervisor struct { // A channel to indicate there is a new config to apply. hasNewConfig chan struct{} + // configApplyTimeout is the maximum time to wait for the agent to apply a new config. + // After this time passes without the agent reporting health as OK, the agent is considered unhealthy. + configApplyTimeout time.Duration + // lastHealthFromClient is the last health status of the agent received from the client. + lastHealthFromClient *protobufs.ComponentHealth + // lastHealth is the last health status of the agent. + lastHealth *protobufs.ComponentHealth // The OpAMP client to connect to the OpAMP Server. opampClient client.OpAMPClient @@ -139,9 +146,12 @@ type Supervisor struct { customMessageToServer chan *protobufs.CustomMessage customMessageWG sync.WaitGroup - agentHasStarted bool + // agentHasStarted is true if the agent has started. + agentHasStarted bool + // agentStartHealthCheckAttempts is the number of health check attempts made by the agent since it started. agentStartHealthCheckAttempts int - agentRestarting atomic.Bool + // agentRestarting is true if the agent is restarting. + agentRestarting atomic.Bool // The OpAMP server to communicate with the Collector's OpAMP extension opampServer server.OpAMPServer @@ -174,6 +184,8 @@ func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, erro return nil, fmt.Errorf("error creating storage dir: %w", err) } + s.configApplyTimeout = s.config.Agent.ConfigApplyTimeout + return s, nil } @@ -528,6 +540,11 @@ func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, messag ) } } + + if message.Health != nil { + s.logger.Debug("Received health status from agent", zap.Bool("healthy", message.Health.Healthy)) + s.lastHealthFromClient = message.Health + } } func (s *Supervisor) forwardCustomMessagesToServerLoop() { @@ -1031,11 +1048,6 @@ func (s *Supervisor) healthCheck() { err := s.healthChecker.Check(ctx) cancel() - if errors.Is(err, s.lastHealthCheckErr) { - // No difference from last check. Nothing new to report. - return - } - // Prepare OpAMP health report. health := &protobufs.ComponentHealth{ StartTimeUnixNano: uint64(s.startedAt.UnixNano()), @@ -1046,6 +1058,10 @@ func (s *Supervisor) healthCheck() { if !s.agentHasStarted && s.agentStartHealthCheckAttempts < 10 { health.LastError = "Agent is starting" s.agentStartHealthCheckAttempts++ + // if we have a last health status, use it + if s.lastHealth != nil && s.lastHealth.Healthy { + health.Healthy = s.lastHealth.Healthy + } } else { health.LastError = err.Error() s.logger.Error("Agent is not healthy", zap.Error(err)) @@ -1055,6 +1071,12 @@ func (s *Supervisor) healthCheck() { health.Healthy = true s.logger.Debug("Agent is healthy.") } + s.lastHealth = health + + if err != nil && errors.Is(err, s.lastHealthCheckErr) { + // No difference from last check. Nothing new to report. + return + } // Report via OpAMP. if err2 := s.opampClient.SetHealth(health); err2 != nil { @@ -1075,9 +1097,21 @@ func (s *Supervisor) runAgentProcess() { restartTimer := time.NewTimer(0) restartTimer.Stop() + configApplyTimeoutTimer := time.NewTimer(0) + configApplyTimeoutTimer.Stop() + for { select { case <-s.hasNewConfig: + s.lastHealthFromClient = nil + if !configApplyTimeoutTimer.Stop() { + select { + case <-configApplyTimeoutTimer.C: // Try to drain the channel + default: + } + } + configApplyTimeoutTimer.Reset(s.config.Agent.ConfigApplyTimeout) + s.logger.Debug("Restarting agent due to new config") restartTimer.Stop() s.stopAgentApplyConfig() @@ -1116,6 +1150,13 @@ func (s *Supervisor) runAgentProcess() { s.logger.Debug("Agent starting after start backoff") s.startAgent() + case <-configApplyTimeoutTimer.C: + if s.lastHealthFromClient == nil || !s.lastHealthFromClient.Healthy { + s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, "Config apply timeout exceeded") + } else { + s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, "") + } + case <-s.healthCheckTicker.C: s.healthCheck() @@ -1205,6 +1246,17 @@ func (s *Supervisor) saveLastReceivedOwnTelemetrySettings(set *protobufs.Telemet return os.WriteFile(filepath.Join(s.config.Storage.Directory, filePath), cfg, 0600) } +func (s *Supervisor) reportConfigStatus(status protobufs.RemoteConfigStatuses, errorMessage string) { + err := s.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{ + LastRemoteConfigHash: s.remoteConfig.ConfigHash, + Status: status, + ErrorMessage: errorMessage, + }) + if err != nil { + s.logger.Error("Could not report OpAMP remote config status", zap.Error(err)) + } +} + func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { configChanged := false @@ -1222,6 +1274,7 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { // Update the agent config if any messages have touched the config if configChanged { + err := s.opampClient.UpdateEffectiveConfig(ctx) if err != nil { s.logger.Error("The OpAMP client failed to update the effective config", zap.Error(err)) @@ -1276,22 +1329,9 @@ func (s *Supervisor) processRemoteConfigMessage(msg *protobufs.AgentRemoteConfig configChanged, err := s.composeMergedConfig(s.remoteConfig) if err != nil { s.logger.Error("Error composing merged config. Reporting failed remote config status.", zap.Error(err)) - err = s.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{ - LastRemoteConfigHash: msg.ConfigHash, - Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, - ErrorMessage: err.Error(), - }) - if err != nil { - s.logger.Error("Could not report failed OpAMP remote config status", zap.Error(err)) - } + s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, err.Error()) } else { - err = s.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{ - LastRemoteConfigHash: msg.ConfigHash, - Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, - }) - if err != nil { - s.logger.Error("Could not report applied OpAMP remote config status", zap.Error(err)) - } + s.reportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING, "") } return configChanged diff --git a/cmd/opampsupervisor/supervisor/supervisor_test.go b/cmd/opampsupervisor/supervisor/supervisor_test.go index 6185ff43c1c1..e9013978fa9e 100644 --- a/cmd/opampsupervisor/supervisor/supervisor_test.go +++ b/cmd/opampsupervisor/supervisor/supervisor_test.go @@ -417,7 +417,7 @@ service: t, &protobufs.RemoteConfigStatus{ LastRemoteConfigHash: remoteConfig.ConfigHash, - Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, + Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING, }, rcs, ) @@ -516,7 +516,7 @@ service: t, &protobufs.RemoteConfigStatus{ LastRemoteConfigHash: remoteConfig.ConfigHash, - Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, + Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING, }, rcs, ) diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_report_status.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_report_status.yaml new file mode 100644 index 000000000000..9beaacec6216 --- /dev/null +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_report_status.yaml @@ -0,0 +1,19 @@ +server: + endpoint: ws://{{.url}}/v1/opamp + tls: + insecure: true + +capabilities: + reports_effective_config: true + reports_own_metrics: true + reports_health: true + accepts_remote_config: true + reports_remote_config: true + accepts_restart_command: true + +storage: + directory: "{{.storage_dir}}" + +agent: + executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}} + config_apply_timeout: {{.config_apply_timeout}}