Skip to content

Commit

Permalink
Handle OpAMP connection settings
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Dec 19, 2023
1 parent 45d1533 commit 5e06efa
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 6 deletions.
3 changes: 3 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package config

import (
"net/http"

"go.opentelemetry.io/collector/config/configtls"
)

Expand All @@ -25,6 +27,7 @@ type Capabilities struct {

type OpAMPServer struct {
Endpoint string
Headers http.Header
TLSSetting configtls.TLSClientSetting `mapstructure:"tls,omitempty"`
}

Expand Down
69 changes: 63 additions & 6 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server"
serverTypes "github.com/open-telemetry/opamp-go/server/types"
"go.opentelemetry.io/collector/config/configopaque"
semconv "go.opentelemetry.io/collector/semconv/v1.21.0"
"go.uber.org/zap"

Expand Down Expand Up @@ -364,12 +365,8 @@ func (s *Supervisor) startOpAMP() error {
OnErrorFunc: func(err *protobufs.ServerErrorResponse) {
s.logger.Error("Server returned an error response", zap.String("message", err.ErrorMessage))
},
OnMessageFunc: s.onMessage,
OnOpampConnectionSettingsFunc: func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error {
// TODO: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21043
s.logger.Debug("Received ConnectionSettings request")
return nil
},
OnMessageFunc: s.onMessage,
OnOpampConnectionSettingsFunc: s.onOpampConnectionSettings,
OnOpampConnectionSettingsAcceptedFunc: func(settings *protobufs.OpAMPConnectionSettings) {
// TODO: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21043
s.logger.Debug("ConnectionSettings accepted")
Expand Down Expand Up @@ -413,6 +410,66 @@ func (s *Supervisor) startOpAMP() error {
return nil
}

func (s *Supervisor) stopOpAMP() error {
s.logger.Debug("Stopping OpAMP client...")
return s.opampClient.Stop(context.Background())
}

func (s *Supervisor) getHeadersFromSettings(protoHeaders *protobufs.Headers) http.Header {
var headers http.Header
for _, header := range protoHeaders.Headers {
headers.Add(header.Key, header.Value)
}
return headers
}

func (s *Supervisor) onOpampConnectionSettings(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error {
if settings == nil {
s.logger.Debug("Received ConnectionSettings request with nil settings")
return nil
}

newServerConfig := &config.OpAMPServer{}

if settings.DestinationEndpoint != "" {
newServerConfig.Endpoint = settings.DestinationEndpoint
}
if settings.Headers != nil {
newServerConfig.Headers = s.getHeadersFromSettings(settings.Headers)
}
if settings.Certificate != nil {
if len(settings.Certificate.CaPublicKey) != 0 {
newServerConfig.TLSSetting.CAPem = configopaque.String(settings.Certificate.CaPublicKey)
}
if len(settings.Certificate.PublicKey) != 0 {
newServerConfig.TLSSetting.CertPem = configopaque.String(settings.Certificate.PublicKey)
}
if len(settings.Certificate.PrivateKey) != 0 {
newServerConfig.TLSSetting.KeyPem = configopaque.String(settings.Certificate.PrivateKey)
}
}

s.stopOpAMP()

// take a copy of the current OpAMP server config
oldServerConfig := s.config.Server
// update the OpAMP server config
s.config.Server = newServerConfig

if err := s.startOpAMP(); err != nil {
s.logger.Error("Cannot connect to the OpAMP server using the new settings", zap.Error(err))
// revert the OpAMP server config
s.config.Server = oldServerConfig
// start the OpAMP client with the old settings
if err := s.startOpAMP(); err != nil {
s.logger.Error("Cannot reconnect to the OpAMP server after restoring old settings", zap.Error(err))
return err
}
}

return nil
}

// TODO: Persist instance ID. https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21073
func (s *Supervisor) createInstanceID() (ulid.ULID, error) {
entropy := ulid.Monotonic(rand.New(rand.NewSource(0)), 0)
Expand Down

0 comments on commit 5e06efa

Please sign in to comment.