Skip to content

Commit

Permalink
[cmd/opampsupervisor]: Supervisor waits for configurable healthchecks…
Browse files Browse the repository at this point in the history
… to report remote config status (open-telemetry#34907)

**Description:** 

This pull request addresses the remote config status reporting issue
discussed in open-telemetry#21079 by introducing the following options to the Agent
config:

1. `config_apply_timeout`: config update is successful if we receive a
healthy status and then observe no failure updates for the entire
duration of the timeout period; otherwise, failure is reported.

**Link to tracking Issue:** open-telemetry#21079

**Testing:** Added e2e test

**Documentation:** <Describe the documentation added.>
  • Loading branch information
srikanthccv authored and sbylica-splunk committed Dec 17, 2024
1 parent c96a8ab commit a8306c9
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 25 deletions.
27 changes: 27 additions & 0 deletions .chloggen/opamp_21079_configurable_health_checks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Supervisor waits for configurable healthchecks to report remote config status.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21079]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
127 changes: 126 additions & 1 deletion cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ func newSupervisor(t *testing.T, configType string, extraConfigData map[string]s
cfg, err := config.Load(cfgFile.Name())
require.NoError(t, err)

s, err := supervisor.NewSupervisor(zap.NewNop(), cfg)
logger, err := zap.NewDevelopment()
require.NoError(t, err)

s, err := supervisor.NewSupervisor(logger, cfg)
require.NoError(t, err)

return s
Expand Down Expand Up @@ -1443,6 +1446,128 @@ func TestSupervisorLogging(t *testing.T) {
require.NoError(t, logFile.Close())
}

func TestSupervisorRemoteConfigApplyStatus(t *testing.T) {
var agentConfig atomic.Value
var healthReport atomic.Value
var remoteConfigStatus atomic.Value
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
agentConfig.Store(string(config.Body))
}
}
if message.Health != nil {
healthReport.Store(message.Health)
}
if message.RemoteConfigStatus != nil {
remoteConfigStatus.Store(message.RemoteConfigStatus)
}

return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "report_status", map[string]string{
"url": server.addr,
"config_apply_timeout": "3s",
})
require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
},
})

// Check that the status is set to APPLYING
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING
}, 5*time.Second, 100*time.Millisecond, "Remote config status was not set to APPLYING")

// Wait for collector to become healthy
require.Eventually(t, func() bool {
health, ok := healthReport.Load().(*protobufs.ComponentHealth)
return ok && health.Healthy
}, 10*time.Second, 10*time.Millisecond, "Collector did not become healthy")

// Check that the status is set to APPLIED
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED
}, 5*time.Second, 10*time.Millisecond, "Remote config status was not set to APPLIED")

require.Eventually(t, func() bool {
cfg, ok := agentConfig.Load().(string)
if ok {
// The effective config may be structurally different compared to what was sent,
// and will also have some data redacted,
// so just check that it includes the filelog receiver
return strings.Contains(cfg, "filelog")
}

return false
}, 5*time.Second, 10*time.Millisecond, "Collector was not started with remote config")

n, err := inputFile.WriteString("{\"body\":\"hello, world\"}\n")
require.NotZero(t, n, "Could not write to input file")
require.NoError(t, err)

require.Eventually(t, func() bool {
logRecord := make([]byte, 1024)
n, _ := outputFile.Read(logRecord)

return n != 0
}, 10*time.Second, 100*time.Millisecond, "Log never appeared in output")

// Test with bad configuration
badCfg, badHash := createBadCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: badCfg.Bytes()},
},
},
ConfigHash: badHash,
},
})

// Check that the status is set to APPLYING
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING
}, 5*time.Second, 200*time.Millisecond, "Remote config status was not set to APPLYING for bad config")

// Wait for the health checks to fail
require.Eventually(t, func() bool {
health, ok := healthReport.Load().(*protobufs.ComponentHealth)
return ok && !health.Healthy
}, 30*time.Second, 100*time.Millisecond, "Collector did not become unhealthy with bad config")

// Check that the status is set to FAILED after failed health checks
require.Eventually(t, func() bool {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED
}, 15*time.Second, 100*time.Millisecond, "Remote config status was not set to FAILED for bad config")
}

func TestSupervisorOpAmpServerPort(t *testing.T) {
var agentConfig atomic.Value
server := newOpAMPServer(
Expand Down
6 changes: 6 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type Agent struct {
Executable string
OrphanDetectionInterval time.Duration `mapstructure:"orphan_detection_interval"`
Description AgentDescription `mapstructure:"description"`
ConfigApplyTimeout time.Duration `mapstructure:"config_apply_timeout"`
BootstrapTimeout time.Duration `mapstructure:"bootstrap_timeout"`
HealthCheckPort int `mapstructure:"health_check_port"`
OpAMPServerPort int `mapstructure:"opamp_server_port"`
Expand Down Expand Up @@ -185,6 +186,10 @@ func (a Agent) Validate() error {
return fmt.Errorf("could not stat agent::executable path: %w", err)
}

if a.ConfigApplyTimeout <= 0 {
return errors.New("agent::config_apply_timeout must be valid duration")
}

return nil
}

Expand Down Expand Up @@ -234,6 +239,7 @@ func DefaultSupervisor() Supervisor {
},
Agent: Agent{
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 5 * time.Second,
BootstrapTimeout: 3 * time.Second,
PassthroughLogs: false,
},
Expand Down
41 changes: 41 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -59,6 +60,7 @@ func TestValidate(t *testing.T) {
},
Agent: Agent{
Executable: "${file_path}",
ConfigApplyTimeout: 2 * time.Second,
OrphanDetectionInterval: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -84,6 +86,7 @@ func TestValidate(t *testing.T) {
},
Agent: Agent{
Executable: "${file_path}",
ConfigApplyTimeout: 2 * time.Second,
OrphanDetectionInterval: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -109,6 +112,7 @@ func TestValidate(t *testing.T) {
},
Agent: Agent{
Executable: "${file_path}",
ConfigApplyTimeout: 2 * time.Second,
OrphanDetectionInterval: 5 * time.Second,
},
Capabilities: Capabilities{
Expand Down Expand Up @@ -138,6 +142,7 @@ func TestValidate(t *testing.T) {
},
Agent: Agent{
Executable: "${file_path}",
ConfigApplyTimeout: 2 * time.Second,
OrphanDetectionInterval: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -164,6 +169,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "",
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -190,6 +196,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "./path/does/not/exist",
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -215,6 +222,7 @@ func TestValidate(t *testing.T) {
},
Agent: Agent{
Executable: "${file_path}",
ConfigApplyTimeout: 2 * time.Second,
OrphanDetectionInterval: -1,
},
Capabilities: Capabilities{
Expand Down Expand Up @@ -242,6 +250,7 @@ func TestValidate(t *testing.T) {
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
HealthCheckPort: 65536,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
Expand Down Expand Up @@ -269,6 +278,7 @@ func TestValidate(t *testing.T) {
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
HealthCheckPort: 0,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -295,6 +305,7 @@ func TestValidate(t *testing.T) {
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
HealthCheckPort: 29848,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -320,6 +331,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 2 * time.Second,
BootstrapTimeout: -5 * time.Second,
},
Capabilities: Capabilities{
Expand All @@ -343,6 +355,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 2 * time.Second,
OpAMPServerPort: 65536,
BootstrapTimeout: 5 * time.Second,
},
Expand All @@ -367,6 +380,7 @@ func TestValidate(t *testing.T) {
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
ConfigApplyTimeout: 2 * time.Second,
OpAMPServerPort: 0,
BootstrapTimeout: 5 * time.Second,
},
Expand All @@ -378,6 +392,33 @@ func TestValidate(t *testing.T) {
},
},
},
{
name: "Invalid config apply timeout",
config: Supervisor{
Server: OpAMPServer{
Endpoint: "wss://localhost:9090/opamp",
Headers: http.Header{
"Header1": []string{"HeaderValue"},
},
TLSSetting: configtls.ClientConfig{
Insecure: true,
},
},
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
OpAMPServerPort: 8080,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
AcceptsRemoteConfig: true,
},
Storage: Storage{
Directory: "/etc/opamp-supervisor/storage",
},
},
expectedError: "agent::config_apply_timeout must be valid duration",
},
}

// create some fake files for validating agent config
Expand Down
Loading

0 comments on commit a8306c9

Please sign in to comment.