diff --git a/.chloggen/supervisor-accepts-conn.yaml b/.chloggen/supervisor-accepts-conn.yaml new file mode 100644 index 000000000000..60dc5d6bc7e4 --- /dev/null +++ b/.chloggen/supervisor-accepts-conn.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Handle OpAMP connection settings. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [21043] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 523cc8586173..6119ed1c5490 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -82,12 +82,12 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca s := server.New(testLogger{t: t}) onConnectedFunc := callbacks.OnConnectedFunc callbacks.OnConnectedFunc = func(ctx context.Context, conn types.Connection) { - agentConn.Store(conn) - isAgentConnected.Store(true) - connectedChan <- true if onConnectedFunc != nil { onConnectedFunc(ctx, conn) } + agentConn.Store(conn) + isAgentConnected.Store(true) + connectedChan <- true } onConnectionCloseFunc := callbacks.OnConnectionCloseFunc callbacks.OnConnectionCloseFunc = func(conn types.Connection) { @@ -473,3 +473,48 @@ func waitForSupervisorConnection(connection chan bool, connected bool) { } } } + +func TestSupervisorOpAMPConnectionSettings(t *testing.T) { + var connectedToNewServer atomic.Bool + initialServer := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{}) + + s := newSupervisor(t, "accepts_conn", map[string]string{"url": initialServer.addr}) + defer s.Shutdown() + + waitForSupervisorConnection(initialServer.supervisorConnected, true) + + newServer := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnConnectedFunc: func(_ context.Context, _ types.Connection) { + connectedToNewServer.Store(true) + }, + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + return &protobufs.ServerToAgent{} + }, + }) + + initialServer.sendToSupervisor(&protobufs.ServerToAgent{ + ConnectionSettings: &protobufs.ConnectionSettingsOffers{ + Opamp: &protobufs.OpAMPConnectionSettings{ + DestinationEndpoint: "ws://" + newServer.addr + "/v1/opamp", + Headers: &protobufs.Headers{ + Headers: []*protobufs.Header{ + { + Key: "x-foo", + Value: "bar", + }, + }, + }, + }, + }, + }) + + require.Eventually(t, func() bool { + return connectedToNewServer.Load() == true + }, 10*time.Second, 500*time.Millisecond, "Collector did not connect to new OpAMP server") +} diff --git a/cmd/opampsupervisor/go.mod b/cmd/opampsupervisor/go.mod index c3d8397b6578..6182579f1fde 100644 --- a/cmd/opampsupervisor/go.mod +++ b/cmd/opampsupervisor/go.mod @@ -11,6 +11,7 @@ require ( github.com/oklog/ulid/v2 v2.1.0 github.com/open-telemetry/opamp-go v0.14.0 github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/collector/config/configopaque v1.4.1-0.20240404121116-4f1a8936d26b go.opentelemetry.io/collector/config/configtls v0.97.1-0.20240404121116-4f1a8936d26b go.opentelemetry.io/collector/semconv v0.97.1-0.20240404121116-4f1a8936d26b go.uber.org/goleak v1.3.0 @@ -27,7 +28,6 @@ require ( github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - go.opentelemetry.io/collector/config/configopaque v1.4.1-0.20240404121116-4f1a8936d26b // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect diff --git a/cmd/opampsupervisor/supervisor/config/config.go b/cmd/opampsupervisor/supervisor/config/config.go index 7d5000574231..1948b42a0701 100644 --- a/cmd/opampsupervisor/supervisor/config/config.go +++ b/cmd/opampsupervisor/supervisor/config/config.go @@ -4,6 +4,8 @@ package config import ( + "net/http" + "go.opentelemetry.io/collector/config/configtls" ) @@ -16,15 +18,17 @@ type Supervisor struct { // Capabilities is the set of capabilities that the Supervisor supports. type Capabilities struct { - AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"` - ReportsEffectiveConfig *bool `mapstructure:"reports_effective_config"` - ReportsOwnMetrics *bool `mapstructure:"reports_own_metrics"` - ReportsHealth *bool `mapstructure:"reports_health"` - ReportsRemoteConfig *bool `mapstructure:"reports_remote_config"` + AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"` + AcceptsOpAMPConnectionSettings *bool `mapstructure:"accepts_opamp_connection_settings"` + ReportsEffectiveConfig *bool `mapstructure:"reports_effective_config"` + ReportsOwnMetrics *bool `mapstructure:"reports_own_metrics"` + ReportsHealth *bool `mapstructure:"reports_health"` + ReportsRemoteConfig *bool `mapstructure:"reports_remote_config"` } type OpAMPServer struct { Endpoint string + Headers http.Header TLSSetting configtls.ClientConfig `mapstructure:"tls,omitempty"` } diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 66d2b1cdf0fd..2c4d132fc88f 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -30,6 +30,8 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/server" serverTypes "github.com/open-telemetry/opamp-go/server/types" + "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/config/configtls" semconv "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" @@ -105,6 +107,8 @@ type Supervisor struct { agentHasStarted bool agentStartHealthCheckAttempts int + + connectedToOpAMPServer chan struct{} } func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { @@ -114,6 +118,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { effectiveConfigFilePath: "effective.yaml", agentConfigOwnMetricsSection: &atomic.Value{}, effectiveConfig: &atomic.Value{}, + connectedToOpAMPServer: make(chan struct{}), } if err := s.createTemplates(); err != nil { @@ -152,6 +157,10 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { return nil, fmt.Errorf("cannot start OpAMP client: %w", err) } + if connErr := s.waitForOpAMPConnection(); connErr != nil { + return nil, fmt.Errorf("failed to connect to the OpAMP server: %w", err) + } + s.commander, err = commander.NewCommander( s.logger, s.config.Agent, @@ -341,6 +350,10 @@ func (s *Supervisor) Capabilities() protobufs.AgentCapabilities { if c.ReportsRemoteConfig != nil && *c.ReportsRemoteConfig { supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig } + + if c.AcceptsOpAMPConnectionSettings != nil && *c.AcceptsOpAMPConnectionSettings { + supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings + } } return supportedCapabilities } @@ -353,12 +366,15 @@ func (s *Supervisor) startOpAMP() error { return err } + s.logger.Debug("Connecting to OpAMP server...", zap.String("endpoint", s.config.Server.Endpoint), zap.Any("headers", s.config.Server.Headers)) settings := types.StartSettings{ OpAMPServerURL: s.config.Server.Endpoint, + Header: s.config.Server.Headers, TLSConfig: tlsConfig, InstanceUid: s.instanceID.String(), Callbacks: types.CallbacksStruct{ OnConnectFunc: func(_ context.Context) { + s.connectedToOpAMPServer <- struct{}{} s.logger.Debug("Connected to the server.") }, OnConnectFailedFunc: func(_ context.Context, err error) { @@ -368,9 +384,9 @@ func (s *Supervisor) startOpAMP() error { s.logger.Error("Server returned an error response", zap.String("message", err.ErrorMessage)) }, OnMessageFunc: s.onMessage, - OnOpampConnectionSettingsFunc: func(_ context.Context, _ *protobufs.OpAMPConnectionSettings) error { - // TODO: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21043 - s.logger.Debug("Received ConnectionSettings request") + OnOpampConnectionSettingsFunc: func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error { + //nolint:errcheck + go s.onOpampConnectionSettings(ctx, settings) return nil }, OnCommandFunc: func(_ context.Context, command *protobufs.ServerToAgentCommand) error { @@ -412,6 +428,88 @@ func (s *Supervisor) startOpAMP() error { return nil } +func (s *Supervisor) stopOpAMP() error { + s.logger.Debug("Stopping OpAMP client...") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err := s.opampClient.Stop(ctx) + // TODO(srikanthccv): remove context.DeadlineExceeded after https://github.com/open-telemetry/opamp-go/pull/213 + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + return err + } + s.logger.Debug("OpAMP client stopped.") + return nil +} + +func (s *Supervisor) getHeadersFromSettings(protoHeaders *protobufs.Headers) http.Header { + headers := make(http.Header) + for _, header := range protoHeaders.Headers { + headers.Add(header.Key, header.Value) + } + return headers +} + +func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *protobufs.OpAMPConnectionSettings) error { + if settings == nil { + s.logger.Debug("Received ConnectionSettings request with nil settings") + return nil + } + + newServerConfig := &config.OpAMPServer{} + + if settings.DestinationEndpoint != "" { + newServerConfig.Endpoint = settings.DestinationEndpoint + } + if settings.Headers != nil { + newServerConfig.Headers = s.getHeadersFromSettings(settings.Headers) + } + if settings.Certificate != nil { + if len(settings.Certificate.CaPublicKey) != 0 { + newServerConfig.TLSSetting.CAPem = configopaque.String(settings.Certificate.CaPublicKey) + } + if len(settings.Certificate.PublicKey) != 0 { + newServerConfig.TLSSetting.CertPem = configopaque.String(settings.Certificate.PublicKey) + } + if len(settings.Certificate.PrivateKey) != 0 { + newServerConfig.TLSSetting.KeyPem = configopaque.String(settings.Certificate.PrivateKey) + } + } else { + newServerConfig.TLSSetting = configtls.ClientConfig{Insecure: true} + } + + if err := s.stopOpAMP(); err != nil { + s.logger.Error("Cannot stop the OpAMP client", zap.Error(err)) + return err + } + + // take a copy of the current OpAMP server config + oldServerConfig := s.config.Server + // update the OpAMP server config + s.config.Server = newServerConfig + + if err := s.startOpAMP(); err != nil { + s.logger.Error("Cannot connect to the OpAMP server using the new settings", zap.Error(err)) + // revert the OpAMP server config + s.config.Server = oldServerConfig + // start the OpAMP client with the old settings + if err := s.startOpAMP(); err != nil { + s.logger.Error("Cannot reconnect to the OpAMP server after restoring old settings", zap.Error(err)) + return err + } + } + return s.waitForOpAMPConnection() +} + +func (s *Supervisor) waitForOpAMPConnection() error { + // wait for the OpAMP client to connect to the server or timeout + select { + case <-s.connectedToOpAMPServer: + return nil + case <-time.After(10 * time.Second): + return errors.New("timed out waiting for the server to connect") + } +} + // TODO: Persist instance ID. https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21073 func (s *Supervisor) createInstanceID() (ulid.ULID, error) { entropy := ulid.Monotonic(rand.New(rand.NewSource(0)), 0) @@ -779,7 +877,7 @@ func (s *Supervisor) Shutdown() { s.logger.Error("Could not report health to OpAMP server", zap.Error(err)) } - err = s.opampClient.Stop(context.Background()) + err = s.stopOpAMP() if err != nil { s.logger.Error("Could not stop the OpAMP client", zap.Error(err)) diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_accepts_conn.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_accepts_conn.yaml new file mode 100644 index 000000000000..0282577b252a --- /dev/null +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_accepts_conn.yaml @@ -0,0 +1,15 @@ +server: + endpoint: ws://{{.url}}/v1/opamp + tls: + insecure: true + +capabilities: + reports_effective_config: true + reports_own_metrics: true + reports_health: true + accepts_remote_config: true + reports_remote_config: true + accepts_opamp_connection_settings: true + +agent: + executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}