Skip to content

Commit

Permalink
[cmd/opampsupervisor] Handle OpAMP connection settings (#30237)
Browse files Browse the repository at this point in the history
**Link to tracking Issue:** <Issue number if applicable>

Part of #21043; based on top of
#29848
to add test

**Testing:** <Describe what testing was performed and which tests were
added.>

Added integration test

---------

Co-authored-by: Evan Bradley <[email protected]>
  • Loading branch information
srikanthccv and evan-bradley authored Apr 9, 2024
1 parent 9c17fd8 commit 4edca20
Showing 6 changed files with 202 additions and 13 deletions.
27 changes: 27 additions & 0 deletions .chloggen/supervisor-accepts-conn.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cmd/opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Handle OpAMP connection settings.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21043]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
51 changes: 48 additions & 3 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
@@ -82,12 +82,12 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca
s := server.New(testLogger{t: t})
onConnectedFunc := callbacks.OnConnectedFunc
callbacks.OnConnectedFunc = func(ctx context.Context, conn types.Connection) {
agentConn.Store(conn)
isAgentConnected.Store(true)
connectedChan <- true
if onConnectedFunc != nil {
onConnectedFunc(ctx, conn)
}
agentConn.Store(conn)
isAgentConnected.Store(true)
connectedChan <- true
}
onConnectionCloseFunc := callbacks.OnConnectionCloseFunc
callbacks.OnConnectionCloseFunc = func(conn types.Connection) {
@@ -473,3 +473,48 @@ func waitForSupervisorConnection(connection chan bool, connected bool) {
}
}
}

func TestSupervisorOpAMPConnectionSettings(t *testing.T) {
var connectedToNewServer atomic.Bool
initialServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{})

s := newSupervisor(t, "accepts_conn", map[string]string{"url": initialServer.addr})
defer s.Shutdown()

waitForSupervisorConnection(initialServer.supervisorConnected, true)

newServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnConnectedFunc: func(_ context.Context, _ types.Connection) {
connectedToNewServer.Store(true)
},
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
return &protobufs.ServerToAgent{}
},
})

initialServer.sendToSupervisor(&protobufs.ServerToAgent{
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
Opamp: &protobufs.OpAMPConnectionSettings{
DestinationEndpoint: "ws://" + newServer.addr + "/v1/opamp",
Headers: &protobufs.Headers{
Headers: []*protobufs.Header{
{
Key: "x-foo",
Value: "bar",
},
},
},
},
},
})

require.Eventually(t, func() bool {
return connectedToNewServer.Load() == true
}, 10*time.Second, 500*time.Millisecond, "Collector did not connect to new OpAMP server")
}
2 changes: 1 addition & 1 deletion cmd/opampsupervisor/go.mod
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ require (
github.com/oklog/ulid/v2 v2.1.0
github.com/open-telemetry/opamp-go v0.14.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/config/configopaque v1.4.1-0.20240404121116-4f1a8936d26b
go.opentelemetry.io/collector/config/configtls v0.97.1-0.20240404121116-4f1a8936d26b
go.opentelemetry.io/collector/semconv v0.97.1-0.20240404121116-4f1a8936d26b
go.uber.org/goleak v1.3.0
@@ -27,7 +28,6 @@ require (
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.opentelemetry.io/collector/config/configopaque v1.4.1-0.20240404121116-4f1a8936d26b // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
14 changes: 9 additions & 5 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,8 @@
package config

import (
"net/http"

"go.opentelemetry.io/collector/config/configtls"
)

@@ -16,15 +18,17 @@ type Supervisor struct {

// Capabilities is the set of capabilities that the Supervisor supports.
type Capabilities struct {
AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"`
ReportsEffectiveConfig *bool `mapstructure:"reports_effective_config"`
ReportsOwnMetrics *bool `mapstructure:"reports_own_metrics"`
ReportsHealth *bool `mapstructure:"reports_health"`
ReportsRemoteConfig *bool `mapstructure:"reports_remote_config"`
AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"`
AcceptsOpAMPConnectionSettings *bool `mapstructure:"accepts_opamp_connection_settings"`
ReportsEffectiveConfig *bool `mapstructure:"reports_effective_config"`
ReportsOwnMetrics *bool `mapstructure:"reports_own_metrics"`
ReportsHealth *bool `mapstructure:"reports_health"`
ReportsRemoteConfig *bool `mapstructure:"reports_remote_config"`
}

type OpAMPServer struct {
Endpoint string
Headers http.Header
TLSSetting configtls.ClientConfig `mapstructure:"tls,omitempty"`
}

106 changes: 102 additions & 4 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
@@ -30,6 +30,8 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server"
serverTypes "github.com/open-telemetry/opamp-go/server/types"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
semconv "go.opentelemetry.io/collector/semconv/v1.21.0"
"go.uber.org/zap"

@@ -105,6 +107,8 @@ type Supervisor struct {

agentHasStarted bool
agentStartHealthCheckAttempts int

connectedToOpAMPServer chan struct{}
}

func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
@@ -114,6 +118,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
effectiveConfigFilePath: "effective.yaml",
agentConfigOwnMetricsSection: &atomic.Value{},
effectiveConfig: &atomic.Value{},
connectedToOpAMPServer: make(chan struct{}),
}

if err := s.createTemplates(); err != nil {
@@ -152,6 +157,10 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
return nil, fmt.Errorf("cannot start OpAMP client: %w", err)
}

if connErr := s.waitForOpAMPConnection(); connErr != nil {
return nil, fmt.Errorf("failed to connect to the OpAMP server: %w", err)
}

s.commander, err = commander.NewCommander(
s.logger,
s.config.Agent,
@@ -341,6 +350,10 @@ func (s *Supervisor) Capabilities() protobufs.AgentCapabilities {
if c.ReportsRemoteConfig != nil && *c.ReportsRemoteConfig {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig
}

if c.AcceptsOpAMPConnectionSettings != nil && *c.AcceptsOpAMPConnectionSettings {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings
}
}
return supportedCapabilities
}
@@ -353,12 +366,15 @@ func (s *Supervisor) startOpAMP() error {
return err
}

s.logger.Debug("Connecting to OpAMP server...", zap.String("endpoint", s.config.Server.Endpoint), zap.Any("headers", s.config.Server.Headers))
settings := types.StartSettings{
OpAMPServerURL: s.config.Server.Endpoint,
Header: s.config.Server.Headers,
TLSConfig: tlsConfig,
InstanceUid: s.instanceID.String(),
Callbacks: types.CallbacksStruct{
OnConnectFunc: func(_ context.Context) {
s.connectedToOpAMPServer <- struct{}{}
s.logger.Debug("Connected to the server.")
},
OnConnectFailedFunc: func(_ context.Context, err error) {
@@ -368,9 +384,9 @@ func (s *Supervisor) startOpAMP() error {
s.logger.Error("Server returned an error response", zap.String("message", err.ErrorMessage))
},
OnMessageFunc: s.onMessage,
OnOpampConnectionSettingsFunc: func(_ context.Context, _ *protobufs.OpAMPConnectionSettings) error {
// TODO: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21043
s.logger.Debug("Received ConnectionSettings request")
OnOpampConnectionSettingsFunc: func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error {
//nolint:errcheck
go s.onOpampConnectionSettings(ctx, settings)
return nil
},
OnCommandFunc: func(_ context.Context, command *protobufs.ServerToAgentCommand) error {
@@ -412,6 +428,88 @@ func (s *Supervisor) startOpAMP() error {
return nil
}

func (s *Supervisor) stopOpAMP() error {
s.logger.Debug("Stopping OpAMP client...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := s.opampClient.Stop(ctx)
// TODO(srikanthccv): remove context.DeadlineExceeded after https://github.com/open-telemetry/opamp-go/pull/213
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
s.logger.Debug("OpAMP client stopped.")
return nil
}

func (s *Supervisor) getHeadersFromSettings(protoHeaders *protobufs.Headers) http.Header {
headers := make(http.Header)
for _, header := range protoHeaders.Headers {
headers.Add(header.Key, header.Value)
}
return headers
}

func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *protobufs.OpAMPConnectionSettings) error {
if settings == nil {
s.logger.Debug("Received ConnectionSettings request with nil settings")
return nil
}

newServerConfig := &config.OpAMPServer{}

if settings.DestinationEndpoint != "" {
newServerConfig.Endpoint = settings.DestinationEndpoint
}
if settings.Headers != nil {
newServerConfig.Headers = s.getHeadersFromSettings(settings.Headers)
}
if settings.Certificate != nil {
if len(settings.Certificate.CaPublicKey) != 0 {
newServerConfig.TLSSetting.CAPem = configopaque.String(settings.Certificate.CaPublicKey)
}
if len(settings.Certificate.PublicKey) != 0 {
newServerConfig.TLSSetting.CertPem = configopaque.String(settings.Certificate.PublicKey)
}
if len(settings.Certificate.PrivateKey) != 0 {
newServerConfig.TLSSetting.KeyPem = configopaque.String(settings.Certificate.PrivateKey)
}
} else {
newServerConfig.TLSSetting = configtls.ClientConfig{Insecure: true}
}

if err := s.stopOpAMP(); err != nil {
s.logger.Error("Cannot stop the OpAMP client", zap.Error(err))
return err
}

// take a copy of the current OpAMP server config
oldServerConfig := s.config.Server
// update the OpAMP server config
s.config.Server = newServerConfig

if err := s.startOpAMP(); err != nil {
s.logger.Error("Cannot connect to the OpAMP server using the new settings", zap.Error(err))
// revert the OpAMP server config
s.config.Server = oldServerConfig
// start the OpAMP client with the old settings
if err := s.startOpAMP(); err != nil {
s.logger.Error("Cannot reconnect to the OpAMP server after restoring old settings", zap.Error(err))
return err
}
}
return s.waitForOpAMPConnection()
}

func (s *Supervisor) waitForOpAMPConnection() error {
// wait for the OpAMP client to connect to the server or timeout
select {
case <-s.connectedToOpAMPServer:
return nil
case <-time.After(10 * time.Second):
return errors.New("timed out waiting for the server to connect")
}
}

// TODO: Persist instance ID. https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21073
func (s *Supervisor) createInstanceID() (ulid.ULID, error) {
entropy := ulid.Monotonic(rand.New(rand.NewSource(0)), 0)
@@ -779,7 +877,7 @@ func (s *Supervisor) Shutdown() {
s.logger.Error("Could not report health to OpAMP server", zap.Error(err))
}

err = s.opampClient.Stop(context.Background())
err = s.stopOpAMP()

if err != nil {
s.logger.Error("Could not stop the OpAMP client", zap.Error(err))
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
server:
endpoint: ws://{{.url}}/v1/opamp
tls:
insecure: true

capabilities:
reports_effective_config: true
reports_own_metrics: true
reports_health: true
accepts_remote_config: true
reports_remote_config: true
accepts_opamp_connection_settings: true

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}

0 comments on commit 4edca20

Please sign in to comment.