Skip to content

Commit

Permalink
Replace pulling metrics port with using direct config (open-telemetry…
Browse files Browse the repository at this point in the history
…#2856)

* checkpoint: with working tests, minimal

* chlog

* issue

* missing change
  • Loading branch information
jaronoff97 authored Apr 16, 2024
1 parent b2b0037 commit d1c4cee
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 91 deletions.
16 changes: 16 additions & 0 deletions .chloggen/2603-part-one.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: collector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Changes metric port logic to use intermediary struct.

# One or more tracking issues related to the change
issues: [2603]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
77 changes: 60 additions & 17 deletions apis/v1beta1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type AnyConfig struct {
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *AnyConfig) DeepCopyInto(out *AnyConfig) {
*out = *in
if in.Object != nil {
in, out := &in.Object, &out.Object
func (c *AnyConfig) DeepCopyInto(out *AnyConfig) {
*out = *c
if c.Object != nil {
in, out := &c.Object, &out.Object
*out = make(map[string]interface{}, len(*in))
for key, val := range *in {
(*out)[key] = val
Expand All @@ -42,12 +42,12 @@ func (in *AnyConfig) DeepCopyInto(out *AnyConfig) {
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AnyConfig.
func (in *AnyConfig) DeepCopy() *AnyConfig {
if in == nil {
func (c *AnyConfig) DeepCopy() *AnyConfig {
if c == nil {
return nil
}
out := new(AnyConfig)
in.DeepCopyInto(out)
c.DeepCopyInto(out)
return out
}

Expand Down Expand Up @@ -88,7 +88,7 @@ type Config struct {
}

// Yaml encodes the current object and returns it as a string.
func (c Config) Yaml() (string, error) {
func (c *Config) Yaml() (string, error) {
var buf bytes.Buffer
yamlEncoder := yaml.NewEncoder(&buf)
yamlEncoder.SetIndent(2)
Expand All @@ -98,16 +98,8 @@ func (c Config) Yaml() (string, error) {
return buf.String(), nil
}

type Service struct {
Extensions *[]string `json:"extensions,omitempty" yaml:"extensions,omitempty"`
// +kubebuilder:pruning:PreserveUnknownFields
Telemetry *AnyConfig `json:"telemetry,omitempty" yaml:"telemetry,omitempty"`
// +kubebuilder:pruning:PreserveUnknownFields
Pipelines AnyConfig `json:"pipelines" yaml:"pipelines"`
}

// Returns null objects in the config.
func (c Config) nullObjects() []string {
func (c *Config) nullObjects() []string {
var nullKeys []string
if nulls := hasNullValue(c.Receivers.Object); len(nulls) > 0 {
nullKeys = append(nullKeys, addPrefix("receivers.", nulls)...)
Expand Down Expand Up @@ -135,6 +127,57 @@ func (c Config) nullObjects() []string {
return nullKeys
}

type Service struct {
Extensions *[]string `json:"extensions,omitempty" yaml:"extensions,omitempty"`
// +kubebuilder:pruning:PreserveUnknownFields
Telemetry *AnyConfig `json:"telemetry,omitempty" yaml:"telemetry,omitempty"`
// +kubebuilder:pruning:PreserveUnknownFields
Pipelines AnyConfig `json:"pipelines" yaml:"pipelines"`
}

// MetricsConfig comes from the collector.
type MetricsConfig struct {
// Level is the level of telemetry metrics, the possible values are:
// - "none" indicates that no telemetry data should be collected;
// - "basic" is the recommended and covers the basics of the service telemetry.
// - "normal" adds some other indicators on top of basic.
// - "detailed" adds dimensions and views to the previous levels.
Level string `json:"level,omitempty" yaml:"level,omitempty"`

// Address is the [address]:port that metrics exposition should be bound to.
Address string `json:"address,omitempty" yaml:"address,omitempty"`
}

// Telemetry is an intermediary type that allows for easy access to the collector's telemetry settings.
type Telemetry struct {
Metrics MetricsConfig `json:"metrics,omitempty" yaml:"metrics,omitempty"`

// Resource specifies user-defined attributes to include with all emitted telemetry.
// Note that some attributes are added automatically (e.g. service.version) even
// if they are not specified here. In order to suppress such attributes the
// attribute must be specified in this map with null YAML value (nil string pointer).
Resource map[string]*string `json:"resource,omitempty" yaml:"resource,omitempty"`
}

// GetTelemetry serves as a helper function to access the fields we care about in the underlying telemetry struct.
// This exists to avoid needing to worry extra fields in the telemetry struct.
func (s *Service) GetTelemetry() *Telemetry {
if s.Telemetry == nil {
return nil
}
// Convert map to JSON bytes
jsonData, err := json.Marshal(s.Telemetry)
if err != nil {
return nil
}
t := &Telemetry{}
// Unmarshal JSON into the provided struct
if err := json.Unmarshal(jsonData, t); err != nil {
return nil
}
return t
}

func hasNullValue(cfg map[string]interface{}) []string {
var nullKeys []string
for k, v := range cfg {
Expand Down
26 changes: 26 additions & 0 deletions apis/v1beta1/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,29 @@ service:

assert.Equal(t, expected, yamlCollector)
}

func TestGetTelemetryFromYAML(t *testing.T) {
collectorYaml, err := os.ReadFile("./testdata/otelcol-demo.yaml")
require.NoError(t, err)

cfg := &Config{}
err = go_yaml.Unmarshal(collectorYaml, cfg)
require.NoError(t, err)
telemetry := &Telemetry{
Metrics: MetricsConfig{
Level: "detailed",
Address: "0.0.0.0:8888",
},
}
assert.Equal(t, telemetry, cfg.Service.GetTelemetry())
}

func TestGetTelemetryFromYAMLIsNil(t *testing.T) {
collectorYaml, err := os.ReadFile("./testdata/otelcol-couchbase.yaml")
require.NoError(t, err)

cfg := &Config{}
err = go_yaml.Unmarshal(collectorYaml, cfg)
require.NoError(t, err)
assert.Nil(t, cfg.Service.GetTelemetry())
}
47 changes: 47 additions & 0 deletions apis/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 5 additions & 23 deletions internal/manifests/collector/adapters/config_to_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"strings"

"github.com/go-logr/logr"
"github.com/mitchellh/mapstructure"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
Expand Down Expand Up @@ -153,29 +152,12 @@ func ConfigToPorts(logger logr.Logger, config map[interface{}]interface{}) ([]v1
}

// ConfigToMetricsPort gets the port number for the metrics endpoint from the collector config if it has been set.
func ConfigToMetricsPort(logger logr.Logger, config map[interface{}]interface{}) (int32, error) {
// we don't need to unmarshal the whole config, just follow the keys down to
// the metrics address.
type metricsCfg struct {
Address string
}
type telemetryCfg struct {
Metrics metricsCfg
}
type serviceCfg struct {
Telemetry telemetryCfg
}
type cfg struct {
Service serviceCfg
}

var cOut cfg
err := mapstructure.Decode(config, &cOut)
if err != nil {
return 0, err
func ConfigToMetricsPort(config v1beta1.Service) (int32, error) {
if config.GetTelemetry() == nil {
// telemetry isn't set, use the default
return 8888, nil
}

_, port, netErr := net.SplitHostPort(cOut.Service.Telemetry.Metrics.Address)
_, port, netErr := net.SplitHostPort(config.GetTelemetry().Metrics.Address)
if netErr != nil && strings.Contains(netErr.Error(), "missing port in address") {
return 8888, nil
} else if netErr != nil {
Expand Down
69 changes: 34 additions & 35 deletions internal/manifests/collector/adapters/config_to_ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/adapters"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/receiver"
Expand Down Expand Up @@ -210,34 +211,33 @@ func TestParserFailed(t *testing.T) {
assert.NoError(t, err)
assert.True(t, mockParserCalled)
}

func TestConfigToMetricsPort(t *testing.T) {
t.Run("custom port specified", func(t *testing.T) {
config := map[interface{}]interface{}{
"service": map[interface{}]interface{}{
"telemetry": map[interface{}]interface{}{
"metrics": map[interface{}]interface{}{
"address": "0.0.0.0:9090",
},
},
},
}

port, err := adapters.ConfigToMetricsPort(logger, config)
assert.NoError(t, err)
assert.Equal(t, int32(9090), port)
})

for _, tt := range []struct {
desc string
config map[interface{}]interface{}
desc string
expectedPort int32
config v1beta1.Service
}{
{
"custom port",
9090,
v1beta1.Service{
Telemetry: &v1beta1.AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"address": "0.0.0.0:9090",
},
},
},
},
},
{
"bad address",
map[interface{}]interface{}{
"service": map[interface{}]interface{}{
"telemetry": map[interface{}]interface{}{
"metrics": map[interface{}]interface{}{
8888,
v1beta1.Service{
Telemetry: &v1beta1.AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"address": "0.0.0.0",
},
},
Expand All @@ -246,10 +246,11 @@ func TestConfigToMetricsPort(t *testing.T) {
},
{
"missing address",
map[interface{}]interface{}{
"service": map[interface{}]interface{}{
"telemetry": map[interface{}]interface{}{
"metrics": map[interface{}]interface{}{
8888,
v1beta1.Service{
Telemetry: &v1beta1.AnyConfig{
Object: map[string]interface{}{
"metrics": map[string]interface{}{
"level": "detailed",
},
},
Expand All @@ -258,24 +259,22 @@ func TestConfigToMetricsPort(t *testing.T) {
},
{
"missing metrics",
map[interface{}]interface{}{
"service": map[interface{}]interface{}{
"telemetry": map[interface{}]interface{}{},
},
8888,
v1beta1.Service{
Telemetry: &v1beta1.AnyConfig{},
},
},
{
"missing telemetry",
map[interface{}]interface{}{
"service": map[interface{}]interface{}{},
},
8888,
v1beta1.Service{},
},
} {
t.Run(tt.desc, func(t *testing.T) {
// these are acceptable failures, we return to the collector's default metric port
port, err := adapters.ConfigToMetricsPort(logger, tt.config)
port, err := adapters.ConfigToMetricsPort(tt.config)
assert.NoError(t, err)
assert.Equal(t, int32(8888), port)
assert.Equal(t, tt.expectedPort, port)
})
}
}
Expand Down
Loading

0 comments on commit d1c4cee

Please sign in to comment.